Horovod: Tensorflow distributed training and validation using allreduce

Created on 8 Jul 2018  路  1Comment  路  Source: horovod/horovod

Hi @alsrgv trying to run distributed validation after every N training cycles errors from: hvd.allreduce(auc); probably something needs to be done differently?

Error looks like this:
Training loop Error 23 ['Traceback (most recent call last):\n', ' File line 305, in train_main\n hvd.allreduce(auc)\n', ' File "/usr/local/anaconda3/lib/python3.6/site-packages/horovod/tensorflow/init.py", line 81, in allreduce\n horovod_size = tf.cast(size(), tensor.dtype)\n', ' File "/usr/local/anaconda3/lib/python3.6/site-packages/tensorflow/python/ops/math_ops.py", line 785, in cast\n x = ops.convert_to_tensor(x, name="x")\n', ' File "/usr/local/anaconda3/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 1014, in convert_to_tensor\n as_ref=False)\n', ' File "/usr/local/anaconda3/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 1104, in internal_convert_to_tensor\n ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)\n', ' File "/usr/local/anaconda3/lib/python3.6/site-packages/tensorflow/python/framework/constant_op.py", line 235, in _constant_tensor_conversion_function\n return constant(v, dtype=dtype, name=name)\n', ' File "/usr/local/anaconda3/lib/python3.6/site-packages/tensorflow/python/framework/constant_op.py", line 220, in constant\n name=name).outputs[0]\n', ' File "/usr/local/anaconda3/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 3364, in create_op\n self._check_not_finalized()\n', ' File "/usr/local/anaconda3/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 3002, in _check_not_finalized\n raise RuntimeError("Graph is finalized and cannot be modified.")\n', 'RuntimeError: Graph is finalized and cannot be modified.\n']

Relevant code:
p = tf.slice(y, [0, 0], [-1, 1])
lbl = tf.slice(y_, [0, 0], [-1, 1])
auc, auc_op = tf.metrics.auc(labels=lbl, predictions=p, curve='ROC')

config = tf.ConfigProto(allow_soft_placement=True) #, log_device_placement=True
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())

checkpoint_dir = flags.run_dir + '/checkpoint' if hvd.rank() == 0 else None

with graph.as_default():

hooks = [

    ds_handle_hook,

    # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states
    # from rank 0 to all other processes. This is necessary to ensure consistent
    # initialization of all workers when training is started with random weights
    # or restored from a checkpoint.
    hvd.BroadcastGlobalVariablesHook(0),

    tf.train.StopAtStepHook(last_step=200000000 // hvd.size()),
    # Horovod: adjust number of steps based on number of GPUs.
    tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': cost}, every_n_iter=100)

]

with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       hooks=hooks,
                                       config=config) as sess:

    print("Session start", hvd.rank())

    train_writer = tf.summary.FileWriter(flags.run_dir + '/checkpoint/train', sess.graph)
    test_writer = tf.summary.FileWriter(flags.run_dir + '/checkpoint/valid')

    #estimate parameters
    trainable_params = np.sum([np.prod(v.shape) for v in tf.trainable_variables()])

    print("Trainable_params", trainable_params)

    #FIXME - configure
    train_steps = 10000
    valid_steps = 100
    t_steps = train_steps
    v_steps = 0

    while not sess.should_stop():

        try:

            if t_steps > 0:

                _, g_step = \
                    sess.run([train_op, global_step],
                             feed_dict={ds_handle: ds_handle_hook.train_handle})

                if t_steps % 100 == 0:
                    t_summary, t_auc, t_rmse = \
                        sess.run([merged_summary_op, auc, rmse],
                                 feed_dict={ds_handle: ds_handle_hook.train_handle})

                    train_writer.add_summary(t_summary, g_step)

                t_steps -= 1

            else:
                t_steps = 0
                v_steps = valid_steps

                print("Validating", hvd.rank())

                while v_steps > 0:
                    #validate

                    v_summary, v_auc, v_rmse, g_step = \
                        sess.run([merged_summary_op, auc, rmse, global_step],
                                 feed_dict={ds_handle: ds_handle_hook.valid_handle})

                    hvd.allreduce(auc)

                    test_writer.add_summary(v_summary, g_step)

                    v_steps -= 1

                v_steps = 0
                t_steps = train_steps

        except Exception:

            exc_type, exc_value, exc_traceback = sys.exc_info()
            lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
            print("Training loop Error",hvd.rank(),lines)
question

Most helpful comment

@vitalyli, MonitoredTrainingSession prohibits further graph modifications. Adding operations like hvd.allreduce() counts as graph modification, which causes your error. A simple solution would be to define avg_auc = hvd.allreduce(auc) immediately after you define it, and use avg_auc in sess.run().

>All comments

@vitalyli, MonitoredTrainingSession prohibits further graph modifications. Adding operations like hvd.allreduce() counts as graph modification, which causes your error. A simple solution would be to define avg_auc = hvd.allreduce(auc) immediately after you define it, and use avg_auc in sess.run().

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ildoonet picture ildoonet  路  3Comments

kit1980 picture kit1980  路  3Comments

chgxtony picture chgxtony  路  3Comments

kangp3 picture kangp3  路  3Comments

waynezhang2018 picture waynezhang2018  路  3Comments