Horovod: Horovod (TF) hangs after shutdown and reinitializing with dynamically spawned processes

Created on 29 Nov 2018  路  3Comments  路  Source: horovod/horovod

I'm working on an application in which worker processes are spawned dynamically from the master process. I can train a network with the first set of spawned processes (with master), but can't do it a second time after calling hvd.shutdown() and spawning new processes, even though the new processes can communicate with master. I've tried different permutations of calling Free and Disconnect on the communicators, with no luck.

It works fine if all of the processes are started at the beginning and none are spawned. (Note that, in this case, hvd.shutdown() causes a segfault if comm (even MPI.COMM_WORLD) is not passed explicitly to hvd.init.)

I've written a MWE:

import sys
import os
from numpy.random import uniform
import tensorflow as tf
from mpi4py import MPI
import horovod.tensorflow as hvd

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

# 'worker' arg is passed to worker, to distinguish it and master
# call with 
# nvidia-docker run -it --rm -v $PWD:/Horovod -w /Horovod "$IMAGE" mpiexec --allow-run-as-root -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib -n 1 python3 mwe_dynamic.py

def send_and_print_msg(comm, msg):
    comm.bcast(msg, root=comm.rank)
    print("%d: sent message: %s" % (comm.rank, msg))

def receive_and_print_msg(comm, root):
    msg = None
    msg = comm.bcast(msg, root)
    print("%d: received message from %d: %s" % (comm.rank, root, msg))

def spawn():
    # spawn one worker process
    print("Spawning worker processes.")
    icomm = MPI.COMM_SELF.Spawn("python3", maxprocs=1, args=['mwe_dynamic.py', 'worker'])
    # merge parent's world with childrens' world, putting parent before children
    comm = icomm.Merge(False)
    print("Rank %d, PID %s" % (comm.rank, os.getpid()))
    # test comm
    send_and_print_msg(comm, os.getpid())
    receive_and_print_msg(comm, 1)
    return icomm, comm

def worker_init():
    print("Getting parent comm.")
    icomm = MPI.Comm.Get_parent()
    # merge parent's world with childrens' world, putting parent before children
    comm = icomm.Merge(True)
    print("Rank %d, PID %s" % (comm.rank, os.getpid()))
    # test comm
    receive_and_print_msg(comm, 0)
    send_and_print_msg(comm, os.getpid())
    return icomm, comm

def train(comm):
    # Initialize Horovod
    hvd.init(comm)
    print("%d: multithreading supported: %s" % (comm.rank, hvd.mpi_threads_supported()))

    config = tf.ConfigProto()
    # put them both on GPU 0
    config.gpu_options.visible_device_list = str(0) # str(comm.rank)
    config.gpu_options.allow_growth = True

    # Build model: ||w-x||
    graph = tf.Graph()
    with graph.as_default():
        shape = (3,2)
        w = tf.Variable(uniform(size=shape))
        x = tf.random_normal(shape=shape)
        loss = tf.losses.mean_squared_error(w, x)
        opt = tf.train.AdamOptimizer(0.1)

        # Add Horovod Distributed Optimizer
        opt = hvd.DistributedOptimizer(opt)

        # Make training operation
        train_op = opt.minimize(loss)

    with tf.Session(graph=graph, config=config) as sess:
        print("%d: initialize variables" % comm.rank)
        sess.run(tf.global_variables_initializer())
        # broadcast variables from rank 0 to all other processes during
        # initialization.
        print("%d: broadcast variables" % comm.rank)
        sess.run(hvd.broadcast_global_variables(0))
        # test comm again
        if comm.rank == 0:
            send_and_print_msg(comm, os.getpid())
            receive_and_print_msg(comm, 1)
        elif comm.rank == 1:
            receive_and_print_msg(comm, 0)
            send_and_print_msg(comm, os.getpid())
        for i in range(5):
            print("%d: iteration %d" % (comm.rank, i))
            sess.run(train_op)  # processes hang here in second run

    print("%d: hvd.shutdown()" % comm.rank)
    hvd.shutdown()

    # print("%d: comm.Free()" % comm.rank)
    # even before hvd.shutdown(): mpi4py.MPI.Exception: MPI_ERR_COMM: invalid communicator
    # comm.Free()

    # print("%d: comm.Disconnect()" % comm.rank)
    # processes hang here if this is called, before or after hvd.shutdown()
    # comm.Disconnect()

    # print("%d: icomm.Free()" % comm.rank)
    # makes no difference
    # icomm.Free()

    # print("%d: icomm.Disconnect()" % comm.rank)
    # makes no difference
    # icomm.Disconnect()

if "worker" not in sys.argv:
    for i in range(2):
        icomm, comm = spawn()
        train(comm)
else:
    icomm, comm = worker_init()
    train(comm)

Here's the command line output:

nvidia-docker run -it --rm -v $PWD:/Horovod -w /Horovod "$IMAGE" mpiexec --allow-run-as-root -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib -n 1 python3 mwe_dynamic.py
Spawning worker processes.
Getting parent comm.
Rank 0, PID 11
Rank 1, PID 24
0: sent message: 11
1: received message from 0: 11
0: received message from 1: 24
1: sent message: 24
1: multithreading supported: True
0: multithreading supported: True
1: initialize variables
0: initialize variables
1: broadcast variables
0: broadcast variables
1: received message from 0: 11
0: sent message: 11
1: sent message: 24
1: iteration 0
0: received message from 1: 24
0: iteration 0

0d28c03512b9:11:37 [0] misc/ibvwrap.cu:61 WARN Failed to open libibverbs.so[.1]
0d28c03512b9:11:37 [0] INFO Using internal Network Socket
0d28c03512b9:11:37 [0] INFO Using NCCL Low-latency algorithm for sizes below 16384
0d28c03512b9:11:37 [0] INFO NET : Using interface eth0:172.17.0.2<0>
0d28c03512b9:11:37 [0] INFO NET/Socket : 1 interfaces found
NCCL version 2.2.13+cuda9.0

0d28c03512b9:24:38 [0] misc/ibvwrap.cu:61 WARN Failed to open libibverbs.so[.1]
0d28c03512b9:24:38 [0] INFO Using internal Network Socket
0d28c03512b9:24:38 [0] INFO Using NCCL Low-latency algorithm for sizes below 16384
0d28c03512b9:24:38 [0] INFO comm 0x7ff58801b260 rank 1 nranks 2
0d28c03512b9:24:38 [0] INFO NET : Using interface eth0:172.17.0.2<0>
0d28c03512b9:24:38 [0] INFO NET/Socket : 1 interfaces found
0d28c03512b9:11:37 [0] INFO comm 0x7fc31401b830 rank 0 nranks 2
0d28c03512b9:11:37 [0] INFO Using 256 threads
0d28c03512b9:11:37 [0] INFO Min Comp Cap 6
0d28c03512b9:11:37 [0] INFO NCCL_SINGLE_RING_THRESHOLD=131072
0d28c03512b9:11:37 [0] INFO Ring 00 :    0   1
0d28c03512b9:24:38 [0] INFO Ring 00 : 1[0] -> 0[0] via P2P/IPC
0d28c03512b9:11:37 [0] INFO Ring 00 : 0[0] -> 1[0] via P2P/IPC
0d28c03512b9:11:37 [0] INFO Launch mode Parallel
1: iteration 1
0: iteration 1
1: iteration 2
0: iteration 2
1: iteration 3
0: iteration 3
1: iteration 4
0: iteration 4
1: hvd.shutdown()
0: hvd.shutdown()
Spawning worker processes.
Getting parent comm.
Rank 0, PID 11
Rank 1, PID 132
0: sent message: 11
1: received message from 0: 11
0: received message from 1: 132
1: sent message: 132
0: multithreading supported: True
1: multithreading supported: True
0: initialize variables
0: broadcast variables
1: initialize variables
1: broadcast variables
0: sent message: 11
1: received message from 0: 11
1: sent message: 132
0: received message from 1: 132
0: iteration 0
1: iteration 0

The processes hang here.

question wontfix

Most helpful comment

@alsrgv Yes, training only on the spawned processes using MPI.COMM_WORLD works:

import sys
import os
from numpy.random import uniform
import tensorflow as tf
from mpi4py import MPI
import horovod.tensorflow as hvd
from time import sleep

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
NUM_WORKERS = 3
NUM_CYCLES = 3

# 'worker' arg is passed to workers, to distinguish them and master
# call with 
# nvidia-docker run -it --rm -v $PWD:/Horovod -w /Horovod "$IMAGE" mpiexec --allow-run-as-root -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib -n 1 python3 mwe_dynamic.py


def spawn():
    # spawn one worker process
    print("Spawning worker processes.")
    icomm = MPI.COMM_SELF.Spawn("python3", maxprocs=NUM_WORKERS, args=['mwe_dynamic.py', 'worker'])
    # merge parent's world with childrens' world, putting parent before children
    merged_comm = icomm.Merge(False)
    comm_world = MPI.COMM_WORLD
    print("Master: comm_world rank %d of %d, PID %s" % (comm_world.rank, comm_world.size, os.getpid()))
    print("Master: merged_comm rank %d of %d, PID %s" % (merged_comm.rank, merged_comm.size, os.getpid()))
    return icomm, merged_comm


def worker_init():
    print("Getting parent comm.")
    icomm = MPI.Comm.Get_parent()
    # merge parent's world with childrens' world, putting parent before children
    merged_comm = icomm.Merge(True)
    comm_world = MPI.COMM_WORLD
    print("Worker: comm_world rank %d of %d, PID %s" % (comm_world.rank, comm_world.size, os.getpid()))
    print("Worker: merged_comm rank %d of %d, PID %s" % (merged_comm.rank, merged_comm.size, os.getpid()))
    return icomm, merged_comm


def train(comm):
    print("%d: Initializing Horovod." % comm.rank)
    hvd.init(comm)
    print("%d: multithreading supported: %s" % (comm.rank, hvd.mpi_threads_supported()))

    config = tf.ConfigProto()
    # put them both on GPU 0
    config.gpu_options.visible_device_list = str(0) # str(comm.rank)
    config.gpu_options.allow_growth = True

    # Build model: ||w-x||
    graph = tf.Graph()
    with graph.as_default():
        shape = (3,2)
        w = tf.Variable(uniform(size=shape))
        x = tf.random_normal(shape=shape)
        loss = tf.losses.mean_squared_error(w, x)
        opt = tf.train.AdamOptimizer(0.1)

        # Add Horovod Distributed Optimizer
        opt = hvd.DistributedOptimizer(opt)

        # Make training operation
        train_op = opt.minimize(loss)

    with tf.Session(graph=graph, config=config) as sess:
        print("%d: initialize variables" % comm.rank)
        sess.run(tf.global_variables_initializer())
        # broadcast variables from rank 0 to all other processes during
        # initialization.
        print("%d: broadcast variables" % comm.rank)
        sess.run(hvd.broadcast_global_variables(0))
        for i in range(5):
            print("%d: iteration %d" % (comm.rank, i))
            sess.run(train_op)

    print("%d: hvd.shutdown()" % comm.rank)
    hvd.shutdown()


if "worker" not in sys.argv:
    for i in range(3):  # train 3 times
        icomm, merged_comm = spawn()
        merged_comm.barrier()
        merged_comm.barrier()
        sleep(1)  # let the other processes finish before spawning more
else:
    icomm, merged_comm = worker_init()
    merged_comm.barrier()
    train(MPI.COMM_WORLD)
    merged_comm.barrier()

All 3 comments

@michaelstjules, were you able to resolve your issue per our discussion? If so, could you write up a solution that worked for you?

@alsrgv Yes, training only on the spawned processes using MPI.COMM_WORLD works:

import sys
import os
from numpy.random import uniform
import tensorflow as tf
from mpi4py import MPI
import horovod.tensorflow as hvd
from time import sleep

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
NUM_WORKERS = 3
NUM_CYCLES = 3

# 'worker' arg is passed to workers, to distinguish them and master
# call with 
# nvidia-docker run -it --rm -v $PWD:/Horovod -w /Horovod "$IMAGE" mpiexec --allow-run-as-root -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib -n 1 python3 mwe_dynamic.py


def spawn():
    # spawn one worker process
    print("Spawning worker processes.")
    icomm = MPI.COMM_SELF.Spawn("python3", maxprocs=NUM_WORKERS, args=['mwe_dynamic.py', 'worker'])
    # merge parent's world with childrens' world, putting parent before children
    merged_comm = icomm.Merge(False)
    comm_world = MPI.COMM_WORLD
    print("Master: comm_world rank %d of %d, PID %s" % (comm_world.rank, comm_world.size, os.getpid()))
    print("Master: merged_comm rank %d of %d, PID %s" % (merged_comm.rank, merged_comm.size, os.getpid()))
    return icomm, merged_comm


def worker_init():
    print("Getting parent comm.")
    icomm = MPI.Comm.Get_parent()
    # merge parent's world with childrens' world, putting parent before children
    merged_comm = icomm.Merge(True)
    comm_world = MPI.COMM_WORLD
    print("Worker: comm_world rank %d of %d, PID %s" % (comm_world.rank, comm_world.size, os.getpid()))
    print("Worker: merged_comm rank %d of %d, PID %s" % (merged_comm.rank, merged_comm.size, os.getpid()))
    return icomm, merged_comm


def train(comm):
    print("%d: Initializing Horovod." % comm.rank)
    hvd.init(comm)
    print("%d: multithreading supported: %s" % (comm.rank, hvd.mpi_threads_supported()))

    config = tf.ConfigProto()
    # put them both on GPU 0
    config.gpu_options.visible_device_list = str(0) # str(comm.rank)
    config.gpu_options.allow_growth = True

    # Build model: ||w-x||
    graph = tf.Graph()
    with graph.as_default():
        shape = (3,2)
        w = tf.Variable(uniform(size=shape))
        x = tf.random_normal(shape=shape)
        loss = tf.losses.mean_squared_error(w, x)
        opt = tf.train.AdamOptimizer(0.1)

        # Add Horovod Distributed Optimizer
        opt = hvd.DistributedOptimizer(opt)

        # Make training operation
        train_op = opt.minimize(loss)

    with tf.Session(graph=graph, config=config) as sess:
        print("%d: initialize variables" % comm.rank)
        sess.run(tf.global_variables_initializer())
        # broadcast variables from rank 0 to all other processes during
        # initialization.
        print("%d: broadcast variables" % comm.rank)
        sess.run(hvd.broadcast_global_variables(0))
        for i in range(5):
            print("%d: iteration %d" % (comm.rank, i))
            sess.run(train_op)

    print("%d: hvd.shutdown()" % comm.rank)
    hvd.shutdown()


if "worker" not in sys.argv:
    for i in range(3):  # train 3 times
        icomm, merged_comm = spawn()
        merged_comm.barrier()
        merged_comm.barrier()
        sleep(1)  # let the other processes finish before spawning more
else:
    icomm, merged_comm = worker_init()
    merged_comm.barrier()
    train(MPI.COMM_WORLD)
    merged_comm.barrier()

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

UditGupta10 picture UditGupta10  路  3Comments

kit1980 picture kit1980  路  3Comments

Jongchan picture Jongchan  路  3Comments

guoyuanxiong picture guoyuanxiong  路  3Comments

kangp3 picture kangp3  路  3Comments