I'm working on an application in which worker processes are spawned dynamically from the master process. I can train a network with the first set of spawned processes (with master), but can't do it a second time after calling hvd.shutdown() and spawning new processes, even though the new processes can communicate with master. I've tried different permutations of calling Free and Disconnect on the communicators, with no luck.
It works fine if all of the processes are started at the beginning and none are spawned. (Note that, in this case, hvd.shutdown() causes a segfault if comm (even MPI.COMM_WORLD) is not passed explicitly to hvd.init.)
I've written a MWE:
import sys
import os
from numpy.random import uniform
import tensorflow as tf
from mpi4py import MPI
import horovod.tensorflow as hvd
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
# 'worker' arg is passed to worker, to distinguish it and master
# call with
# nvidia-docker run -it --rm -v $PWD:/Horovod -w /Horovod "$IMAGE" mpiexec --allow-run-as-root -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib -n 1 python3 mwe_dynamic.py
def send_and_print_msg(comm, msg):
comm.bcast(msg, root=comm.rank)
print("%d: sent message: %s" % (comm.rank, msg))
def receive_and_print_msg(comm, root):
msg = None
msg = comm.bcast(msg, root)
print("%d: received message from %d: %s" % (comm.rank, root, msg))
def spawn():
# spawn one worker process
print("Spawning worker processes.")
icomm = MPI.COMM_SELF.Spawn("python3", maxprocs=1, args=['mwe_dynamic.py', 'worker'])
# merge parent's world with childrens' world, putting parent before children
comm = icomm.Merge(False)
print("Rank %d, PID %s" % (comm.rank, os.getpid()))
# test comm
send_and_print_msg(comm, os.getpid())
receive_and_print_msg(comm, 1)
return icomm, comm
def worker_init():
print("Getting parent comm.")
icomm = MPI.Comm.Get_parent()
# merge parent's world with childrens' world, putting parent before children
comm = icomm.Merge(True)
print("Rank %d, PID %s" % (comm.rank, os.getpid()))
# test comm
receive_and_print_msg(comm, 0)
send_and_print_msg(comm, os.getpid())
return icomm, comm
def train(comm):
# Initialize Horovod
hvd.init(comm)
print("%d: multithreading supported: %s" % (comm.rank, hvd.mpi_threads_supported()))
config = tf.ConfigProto()
# put them both on GPU 0
config.gpu_options.visible_device_list = str(0) # str(comm.rank)
config.gpu_options.allow_growth = True
# Build model: ||w-x||
graph = tf.Graph()
with graph.as_default():
shape = (3,2)
w = tf.Variable(uniform(size=shape))
x = tf.random_normal(shape=shape)
loss = tf.losses.mean_squared_error(w, x)
opt = tf.train.AdamOptimizer(0.1)
# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)
# Make training operation
train_op = opt.minimize(loss)
with tf.Session(graph=graph, config=config) as sess:
print("%d: initialize variables" % comm.rank)
sess.run(tf.global_variables_initializer())
# broadcast variables from rank 0 to all other processes during
# initialization.
print("%d: broadcast variables" % comm.rank)
sess.run(hvd.broadcast_global_variables(0))
# test comm again
if comm.rank == 0:
send_and_print_msg(comm, os.getpid())
receive_and_print_msg(comm, 1)
elif comm.rank == 1:
receive_and_print_msg(comm, 0)
send_and_print_msg(comm, os.getpid())
for i in range(5):
print("%d: iteration %d" % (comm.rank, i))
sess.run(train_op) # processes hang here in second run
print("%d: hvd.shutdown()" % comm.rank)
hvd.shutdown()
# print("%d: comm.Free()" % comm.rank)
# even before hvd.shutdown(): mpi4py.MPI.Exception: MPI_ERR_COMM: invalid communicator
# comm.Free()
# print("%d: comm.Disconnect()" % comm.rank)
# processes hang here if this is called, before or after hvd.shutdown()
# comm.Disconnect()
# print("%d: icomm.Free()" % comm.rank)
# makes no difference
# icomm.Free()
# print("%d: icomm.Disconnect()" % comm.rank)
# makes no difference
# icomm.Disconnect()
if "worker" not in sys.argv:
for i in range(2):
icomm, comm = spawn()
train(comm)
else:
icomm, comm = worker_init()
train(comm)
Here's the command line output:
nvidia-docker run -it --rm -v $PWD:/Horovod -w /Horovod "$IMAGE" mpiexec --allow-run-as-root -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib -n 1 python3 mwe_dynamic.py
Spawning worker processes.
Getting parent comm.
Rank 0, PID 11
Rank 1, PID 24
0: sent message: 11
1: received message from 0: 11
0: received message from 1: 24
1: sent message: 24
1: multithreading supported: True
0: multithreading supported: True
1: initialize variables
0: initialize variables
1: broadcast variables
0: broadcast variables
1: received message from 0: 11
0: sent message: 11
1: sent message: 24
1: iteration 0
0: received message from 1: 24
0: iteration 0
0d28c03512b9:11:37 [0] misc/ibvwrap.cu:61 WARN Failed to open libibverbs.so[.1]
0d28c03512b9:11:37 [0] INFO Using internal Network Socket
0d28c03512b9:11:37 [0] INFO Using NCCL Low-latency algorithm for sizes below 16384
0d28c03512b9:11:37 [0] INFO NET : Using interface eth0:172.17.0.2<0>
0d28c03512b9:11:37 [0] INFO NET/Socket : 1 interfaces found
NCCL version 2.2.13+cuda9.0
0d28c03512b9:24:38 [0] misc/ibvwrap.cu:61 WARN Failed to open libibverbs.so[.1]
0d28c03512b9:24:38 [0] INFO Using internal Network Socket
0d28c03512b9:24:38 [0] INFO Using NCCL Low-latency algorithm for sizes below 16384
0d28c03512b9:24:38 [0] INFO comm 0x7ff58801b260 rank 1 nranks 2
0d28c03512b9:24:38 [0] INFO NET : Using interface eth0:172.17.0.2<0>
0d28c03512b9:24:38 [0] INFO NET/Socket : 1 interfaces found
0d28c03512b9:11:37 [0] INFO comm 0x7fc31401b830 rank 0 nranks 2
0d28c03512b9:11:37 [0] INFO Using 256 threads
0d28c03512b9:11:37 [0] INFO Min Comp Cap 6
0d28c03512b9:11:37 [0] INFO NCCL_SINGLE_RING_THRESHOLD=131072
0d28c03512b9:11:37 [0] INFO Ring 00 : 0 1
0d28c03512b9:24:38 [0] INFO Ring 00 : 1[0] -> 0[0] via P2P/IPC
0d28c03512b9:11:37 [0] INFO Ring 00 : 0[0] -> 1[0] via P2P/IPC
0d28c03512b9:11:37 [0] INFO Launch mode Parallel
1: iteration 1
0: iteration 1
1: iteration 2
0: iteration 2
1: iteration 3
0: iteration 3
1: iteration 4
0: iteration 4
1: hvd.shutdown()
0: hvd.shutdown()
Spawning worker processes.
Getting parent comm.
Rank 0, PID 11
Rank 1, PID 132
0: sent message: 11
1: received message from 0: 11
0: received message from 1: 132
1: sent message: 132
0: multithreading supported: True
1: multithreading supported: True
0: initialize variables
0: broadcast variables
1: initialize variables
1: broadcast variables
0: sent message: 11
1: received message from 0: 11
1: sent message: 132
0: received message from 1: 132
0: iteration 0
1: iteration 0
The processes hang here.
@michaelstjules, were you able to resolve your issue per our discussion? If so, could you write up a solution that worked for you?
@alsrgv Yes, training only on the spawned processes using MPI.COMM_WORLD works:
import sys
import os
from numpy.random import uniform
import tensorflow as tf
from mpi4py import MPI
import horovod.tensorflow as hvd
from time import sleep
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
NUM_WORKERS = 3
NUM_CYCLES = 3
# 'worker' arg is passed to workers, to distinguish them and master
# call with
# nvidia-docker run -it --rm -v $PWD:/Horovod -w /Horovod "$IMAGE" mpiexec --allow-run-as-root -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib -n 1 python3 mwe_dynamic.py
def spawn():
# spawn one worker process
print("Spawning worker processes.")
icomm = MPI.COMM_SELF.Spawn("python3", maxprocs=NUM_WORKERS, args=['mwe_dynamic.py', 'worker'])
# merge parent's world with childrens' world, putting parent before children
merged_comm = icomm.Merge(False)
comm_world = MPI.COMM_WORLD
print("Master: comm_world rank %d of %d, PID %s" % (comm_world.rank, comm_world.size, os.getpid()))
print("Master: merged_comm rank %d of %d, PID %s" % (merged_comm.rank, merged_comm.size, os.getpid()))
return icomm, merged_comm
def worker_init():
print("Getting parent comm.")
icomm = MPI.Comm.Get_parent()
# merge parent's world with childrens' world, putting parent before children
merged_comm = icomm.Merge(True)
comm_world = MPI.COMM_WORLD
print("Worker: comm_world rank %d of %d, PID %s" % (comm_world.rank, comm_world.size, os.getpid()))
print("Worker: merged_comm rank %d of %d, PID %s" % (merged_comm.rank, merged_comm.size, os.getpid()))
return icomm, merged_comm
def train(comm):
print("%d: Initializing Horovod." % comm.rank)
hvd.init(comm)
print("%d: multithreading supported: %s" % (comm.rank, hvd.mpi_threads_supported()))
config = tf.ConfigProto()
# put them both on GPU 0
config.gpu_options.visible_device_list = str(0) # str(comm.rank)
config.gpu_options.allow_growth = True
# Build model: ||w-x||
graph = tf.Graph()
with graph.as_default():
shape = (3,2)
w = tf.Variable(uniform(size=shape))
x = tf.random_normal(shape=shape)
loss = tf.losses.mean_squared_error(w, x)
opt = tf.train.AdamOptimizer(0.1)
# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)
# Make training operation
train_op = opt.minimize(loss)
with tf.Session(graph=graph, config=config) as sess:
print("%d: initialize variables" % comm.rank)
sess.run(tf.global_variables_initializer())
# broadcast variables from rank 0 to all other processes during
# initialization.
print("%d: broadcast variables" % comm.rank)
sess.run(hvd.broadcast_global_variables(0))
for i in range(5):
print("%d: iteration %d" % (comm.rank, i))
sess.run(train_op)
print("%d: hvd.shutdown()" % comm.rank)
hvd.shutdown()
if "worker" not in sys.argv:
for i in range(3): # train 3 times
icomm, merged_comm = spawn()
merged_comm.barrier()
merged_comm.barrier()
sleep(1) # let the other processes finish before spawning more
else:
icomm, merged_comm = worker_init()
merged_comm.barrier()
train(MPI.COMM_WORLD)
merged_comm.barrier()
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Most helpful comment
@alsrgv Yes, training only on the spawned processes using MPI.COMM_WORLD works: