Maskrcnn-benchmark: How does the `synchronize` function work?

Created on 18 Nov 2018  ยท  29Comments  ยท  Source: facebookresearch/maskrcnn-benchmark

โ“ Questions and Help

I found that in your codes you use synchronize helper function to synchronize gpus. I am not familiar with the usage of torch.distributed.deprecated, and I am trying to understand how does the following codes actually work.

def synchronize():
    """
    Helper function to synchronize between multiple processes when
    using distributed training
    """
    if not torch.distributed.deprecated.is_initialized():
        return
    world_size = torch.distributed.deprecated.get_world_size()
    rank = torch.distributed.deprecated.get_rank()
    if world_size == 1:
        return

    def _send_and_wait(r):
        if rank == r:
            tensor = torch.tensor(0, device="cuda")
        else:
            tensor = torch.tensor(1, device="cuda")
        torch.distributed.deprecated.broadcast(tensor, r)
        while tensor.item() == 1:
            time.sleep(1)

    _send_and_wait(0)
    # now sync on the main process
    _send_and_wait(1)

I get some questions here.

  1. Will broadcast block the main process until all other processes receive the tensor? What are the behaviors of this function in different type of processes?
  2. Why use _send_and_wait(1) at the end of this function? I know that rank 0 is the master, but what is special for rank 1?
question

Most helpful comment

Yes, this happens because of the lazy initialization in the NCCL backend. The faster process will try to create a new NCCL communicator and is waiting for the slower process to do the same. This times out after 5 minutes. This timeout is set on the k/v store (be it a file backed store or TCP store where a single process acts as server) and is currently not configurable.

This is a dup of pytorch/pytorch#16225 so this one can be closed and we can continue discussion there.

All 29 comments

Hi,

I believe with NCCL backend broadcast is an async function, so that it doesn't block the main process. This is why I use the while tensor.item() == 1: time.sleep(1) in there.
The reason why I do _send_and_wait twice is that when sending on process 0, I can enforce that all other process will sync, but not process 0.
Then, I launch it on another process (say 1) to make sure that 0 also waits for it and is in sync.

Let me know if you have other questions.

What if the processes with rank id 0 and 1 run much faster than other processes? In this case, other processes can never catch up those two processes.

I believe once process 0 broadcasts, all other processes but itself will wait. I've never experienced deadlocks with the current version of my code, but I might be missing something here.
Are you experiencing deadlocks?

Well, I didn't experience that. Do you mean that other processes will wait until all of them receive those tensors broadcast from process 0?

Because of the wait in the function, I believe all processes except 0 will wait to receive the tensor, and then we make all processes except 1 wait as well

Is it possible that processes except 0 and 1 run at a very slow speed? And then both process 0 and process 1 finish in synchronize function and return, while others haven't step into the synchronize function yet(because they are too slow). In this case, processes 0 and 1 will keep going.

Maybe that could be a possibility.
In this case, we might indeed want to run the wait ngpu times indeed

I think that c10d (the new distributed backend) might add some new primitives, which might be better off considering using

Okay, thank you for your generous help and quick reply.

Thanks for your questions, it indeed made me realise that there can be sync problems with the current implementation. I'm opening the issue to keep track of it

When I was running inference codes, an error would happen sometimes.
The information of this error was as follows, perhaps caused by this issue.

Traceback (most recent call last):
  File "tools/test_net.py", line 100, in <module>
    main()
  File "tools/test_net.py", line 94, in main
    output_folder=output_folder,
  File "/home/yelantf/codes/faster_rcnn/maskrcnn_benchmark/engine/inference.py", line 106, in inference
    synchronize()
  File "/home/yelantf/codes/faster_rcnn/maskrcnn_benchmark/utils/comm.py", line 62, in synchronize
    _send_and_wait(0)
  File "/home/yelantf/codes/faster_rcnn/maskrcnn_benchmark/utils/comm.py", line 58, in _send_and_wait
    torch.distributed.broadcast(tensor, r)
  File "/home/yelantf/codes/myenv_torch1.0/lib/python3.5/site-packages/torch/distributed/distributed_c10d.py", line 737, in broadcast
    work = _default_pg.broadcast([tensor], opts)
RuntimeError: Resource temporarily unavailable

I notice that the codes currently use torch.distributed instead of torch.distributed.deprecated, and that barrier is supported in pytorch for nccl backend now. Maybe we could just use torch.distributed.barrier to synchronize all processes?

Yes, I would like to try replacing it by torch.distributed.barrier, but I'd need to perform some checks to see if it does indeed do the same thing.
Could you try it out and send a PR if it works?
Thanks!

I tried that, torch.distributed.barrier is exactly what we want. However, the RuntimeError mentioned above still raises. After some investigation, I find that if some other process run much faster than process 0 and wait process 0 for around 5 minutes, the error will raise. To reproduce, run following codes with command CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 debug_dist.py ,

#debug_dist.py
import argparse
import os

import torch
import torch.nn as nn

def synchronize():
    """
    Helper function to synchronize between multiple processes when
    using distributed training
    """
    if not torch.distributed.is_available():
        return
    if not torch.distributed.is_initialized():
        return
    world_size = torch.distributed.get_world_size()
    if world_size == 1:
        return
    torch.distributed.barrier()

def test_inf(caltime):
    rank = torch.distributed.get_rank()
    cpu_device = torch.device("cpu")
    res_list = []
    model = nn.Conv3d(10, 1024, 3).to('cuda')
    model.eval()
    for i in range(caltime):
        input = torch.rand(3, 10, 27, 27, 27, device='cuda')
        with torch.no_grad():
            output = model(input).mean()
            output = output.to(cpu_device)
            res_list.append(output)
        if i%1000==0 and rank==0:
            print(i)
    return res_list

def main():
    parser = argparse.ArgumentParser(description="Debug for distributed inference.")
    parser.add_argument("--local_rank", type=int, default=0)
    args = parser.parse_args()

    num_gpus = int(os.environ["WORLD_SIZE"]) if "WORLD_SIZE" in os.environ else 1
    distributed = num_gpus > 1

    if distributed:
        torch.cuda.set_device(args.local_rank)
        torch.distributed.init_process_group(
            backend="nccl", init_method="env://"
        )

    rank = torch.distributed.get_rank()
    assert rank == args.local_rank, "damn!"

    caltime = 1000

    res = test_inf(caltime)

    if rank == 0:
        time.sleep(330)

    synchronize()

if __name__ == "__main__":
    main()

I haven't found any workaround yet.

I don't know about this problem, there might be a timeout in barrier that is raising the error. What is the error message by the way?

The message is quite similar to the one mentioned in above comments, but only replace _default_pg.broadcast by _default_pg.barrier.

@pietern do you know by chance when such an error might happen?

Traceback (most recent call last):
  File "tools/test_net.py", line 100, in <module>
    main()
  File "tools/test_net.py", line 94, in main
    output_folder=output_folder,
  File "/home/yelantf/codes/faster_rcnn/maskrcnn_benchmark/engine/inference.py", line 106, in inference
    synchronize()
  File "/home/yelantf/codes/faster_rcnn/maskrcnn_benchmark/utils/comm.py", line 62, in synchronize
    _send_and_wait(0)
  File "/home/yelantf/codes/faster_rcnn/maskrcnn_benchmark/utils/comm.py", line 58, in _send_and_wait
    torch.distributed.broadcast(tensor, r)
  File "/home/yelantf/codes/myenv_torch1.0/lib/python3.5/site-packages/torch/distributed/distributed_c10d.py", line 737, in broadcast
    work = _default_pg.broadcast([tensor], opts)
RuntimeError: Resource temporarily unavailable

Yes, this happens because of the lazy initialization in the NCCL backend. The faster process will try to create a new NCCL communicator and is waiting for the slower process to do the same. This times out after 5 minutes. This timeout is set on the k/v store (be it a file backed store or TCP store where a single process acts as server) and is currently not configurable.

This is a dup of pytorch/pytorch#16225 so this one can be closed and we can continue discussion there.

Since it is about the initialization, maybe we can use torch.distributed.barrier() right after function torch.distributed.init_process_group? I tried that, the error disappeared. I think this should be a workaround.
@fmassa @pietern

@yelantingfeng oh wow, so there is a large time to init_process_group, which is very different from different machines/GPUs?

No, that's not the case. The time spent on inference are very different for different process in some case. I got that the error happens when the process try to create a new NCCL communicator, so I tried to create the communicator at the very beginning. I thought that no more communicator would be created then, and thus no error would raise.

So your implementation differs quite a bit from what we currently have in maskrcnn-benchmark?

Not a lot, just serveral lines.

  1. add synchronize right after torch.distributed.init_process_group in train_net.py and test_net.py.
  2. use torch.distributed.barrier instead of torch.distributed.broadcast in synchronize function.

But what was the case where you had the errors appear? Is your dataset different than COCO? Just so that I understand in which context this error happened, and why the processes have wildly different speeds. Do you have different GPUs?

Oh, yes, I didn't use coco and error did not always happen. It happens more frequently when I run multiple copy of inference codes on a single machine at the same time. All 8 GPUs are the same. I use 0-3 to run a copy of codes and 4-7 to run another. I think maybe the cause is the high workload of CPU.

BTW, would you like to synchronize right after torch.distributed.init_process_group? I'd like to send a PR where we use torch.distributed.barrier with that change.

I don't see any particular problem with it, so why not.

But it is still a bit mysterious to me, from your previous explanation it just seems like a workaround solution, as it reduces the overall waiting time to be under 5 minutes, but does not really fix the underlying problem I think, and we would need to add additional sync points in the middle of the code in the future if something gets blocked again

Running synchronize right after init_process_group forces initialization of the underlying NCCL communicator to happen immediately, instead of after this first step. Then when the duration of this first step varies by more than 5 minutes it won't hit the timeout, but rather blindly uses the existing NCCL communicator. This timeout is not yet configurable from Python and I added https://github.com/pytorch/pytorch/issues/16520 to fix this. If this were configurable from Python you could pass a higher timeout value to init_process_group depending on how long the time to the first call to synchronize is.

That said, executing synchronization right after init_process_group is a reasonable fix and doesn't hurt.

Fixed by @yelantingfeng in #393

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jbitton picture jbitton  ยท  4Comments

mrteera picture mrteera  ยท  3Comments

YuShen1116 picture YuShen1116  ยท  4Comments

BelhalK picture BelhalK  ยท  4Comments

hadim picture hadim  ยท  4Comments