Caffe: Errors with multiprocessing in python

Created on 28 Jan 2016  Â·  24Comments  Â·  Source: BVLC/caffe

I have a pretrained caffe model and I'm trying to make a prediction with it. However, I am trying to perform this prediction in a separate process using python's multiprocessing module so as to not slow down the main program. However, the process seems to freeze when trying to make the prediction. Is this possible with caffe? If not, are there other methods to do efficient parallel processing with caffe in python?

Here's a simplified version of my code

def predict(pretrained_network, data):
    prediction = pretrained_network.predict([data])
    return prediction

#Main Program

#Network loaded here 

while True:
    #Grab data
    p = multiprocessing.Process(target=predict, args=(pretrained_network, data))
    p.daemon = True
    p.start()
    #Do other stuff

Most helpful comment

@TheShadow29
Multiprocessing works fine with Caffe, I use it myself and it's the best variant on python.
But you have to adhere to a strict rule:

  • Everything related to Caffe has to happen strictly within ONE process. You cannot use set_gpu and switch Caffe to GPU on one process, and then use Caffe in another process.
  • I recommend using a queue and pull elements from the queue on the main process. The workers that collect your data can be on other processes.

The error you get is because the CUDA (or OpenCL) initialization of Caffe is both thread local and process local. Cannot be shared across processes.

Here is a snippet that works:

....
from multiprocessing import Process, Queue, JoinableQueue, cpu_count
....

# Switch Caffe to GPU Mode
caffe.set_mode_gpu()
# Select the first GPU
caffe.set_device(0)

....

queue = Queue()
# With a 4 core 8 thread CPU this would give you 7 processes to gather data and one process to manage the GPU and main loop.
number_of_processes = cpu_count() - 1
workers = [Process(target=DataProviderTask, args=(data_prov,(width_in,height_in),(width_out,height_out))) for i in range(0,number_of_processes)]
for w in workers:
    w.start()

while solver.iter < solver_config.max_iter:
    # Fill the whole batch
    for bidx in range(0,solver.net.blobs['data'].shape[0]):
        # Get a queued sample
        im, gt = queue.get()
        # Load the sample into the network
        solver.net.blobs['data'].data[bidx,:] = im
        solver.net.blobs['label'].data[bidx,:] = gt
    # Train one step
    loss = solver.step(1)

Note that the workers here are separate processes, and Caffe is NEVER used/referred to in the workers! That is important.

a smart worker that fills the queue could look something like this:

def DataProviderTask(data_prov, in_size, out_size):
    # Pick a random sample for training
    backoff = 0.1
    while(True):
        if queue.qsize() < 50:
            im, gt = data_prov.draw_train_sample(in_size[0], in_size[1], out_size[0], out_size[1])
            queue.put((im,gt))
            backoff = 0.1
        else:
            time.sleep(backoff)
            backoff *= 2

All 24 comments

I think the problem is that multiprocessing has to pickle objects to send them across separate processes, and I don't think you can pickle a caffe Net like that. Even if you could pickle it, you'd need each Net to point to different GPU memory so that they don't clash.

You could make it work if you loaded the net in the worker process, and used a producer-consumer model. There is no way to get around having each worker load its own copy of the net. What you are doing is in fact making a copy of the net for each process anyway, but through pickling (transparent to the user).

Since loading the net takes a fair amount of time,Is there a method whereby I can load the net once in a process and call the process repeatedly without having to reload the net each time?

Regards,
Rohan

On Jan 28, 2016, at 3:18 AM, Sean Bell [email protected] wrote:

I think the problem is that multiprocessing has to pickle objects to send them across separate processes, and I don't think you can pickle a caffe Net like that. Even if you could pickle it, you'd need each Net to point to different GPU memory so that they don't clash.

You could make it work if you loaded the net in the worker process, and used a producer-consumer model.

—
Reply to this email directly or view it on GitHub.

Yes, if you set up your processes in a producer-consumer relationship. The parent process produces data to process, and a persistent worker (that has the net loaded) consumes from the queue (e.g. multiprocessing.Queue).

Or, you could send data in larger batches (e.g. filenames of items to process).

I've attempted this approach and created a Consumer where the net was
loaded. However the worker process (consumer process) still is unable to
perform the prediction.

On Thu, Jan 28, 2016 at 3:28 AM, Sean Bell [email protected] wrote:

Yes, if you set up your processes in a producer-consumer relationship. The
parent process produces data to process, and a persistent worker (that has
the net loaded) consumes from the queue (e.g. multiprocessing.Queue).

Or, you could send data in larger batches (e.g. filenames of items to
process).

—
Reply to this email directly or view it on GitHub
https://github.com/BVLC/caffe/issues/3607#issuecomment-176049689.

It would be good to pinpoint where exactly it is hanging. Otherwise it's difficult to help remotely debug your app.

Sorry, here's the new sample code

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        #LOADING CAFFE NETWORK CONFIGS
        #omitted for brevity

    def run(self):
        while True:
            image = self.task_queue.get()
            prediction = net.predict([img])
        return

tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumers.start

while True:
    #Grab image from stream
    tasks.put(image)
    #Do other stuff

The program hangs on the line:
prediction = net.predict([img])
It is unable to get past that.

Does it work when not using multiprocessing? I don't have more ideas of how to help you debug it. But if someone else were to help, they'd want to know whether you're using a GPU, and the version numbers of: OS, python, caffe (which commit), and cuDNN (if enabled).

It does work when not using multiprocessing. I've verified that. I just wanted to split the computation off in a separate process because making the prediction in my main program slows it down.

I am using a NVIDIA GeForce GT 650M GPU. CUDA Driver Version: 7.0.64 I am also using Python 2.7.10, OSX 10.9.5. I am unsure as to which commit of Caffe I have, but it is from around August 2015.

Any suggestions for where I can obtain some more help? Would rewriting the program in C++ be helpful? Or recompiling in CPU only mode? I'm pretty stuck here.

You could try the caffe-users group (https://groups.google.com/forum/#!forum/caffe-users).

I usually distribute feature extraction using celery, where each job is given a list of filenames to process, and the job loads the net, processes the images, saves them somewhere (e.g. in redis), and then exits. I'm not sure if that will solve your problem, or just give you a different infrastructure that has the same problem.

You could write it in C++, but you would have to be careful with threading.

Hmm... with the code that you've written there... doesn't Consumer.__init__() run in the parent process? I would think that you want to load the net in the worker process (i.e. inside run()).

I tried loading the net in worker process just now (before the while loop), but still no luck.

At the start of run(), you load the net, and then you enter a loop where you repeatedly pull from the queue. You create the process once, and then put many items on the queue.

def run(self):
    net = load_net(...)
    while True:
        image = self.task_queue.get()
        prediction = net.predict([img])
    return

Yeah just tried that model, but it still does not execute the prediction step. It is able to get the image from the queue however.

I tried recompiling in CPU_ONLY mode and still face the same issue. I am not sure it is a problem with the GPU, but maybe a bug in pycaffe?

@pythonanonuser were you able to get a way around? I am also stuck at the same place

Did you also set GPU mode and do all the other Caffe related code inside the run function? It may not work otherwise... @TheShadow29 can you post the code on pastebin and give a link?

@naibaf7 I am sorry. The place it was stuck at was a very silly mistake on my part (not relevant to caffe). After that, using multiprocessing.Process gave the error

F0519 11:43:01.104552 6985 syncedmem.hpp:18] Check failed: error == cudaSuccess (3 vs. 0) initialization error *** Check failure stack trace: ***
which redirected me to this link. I have yet to try threading library.

@TheShadow29
Multiprocessing works fine with Caffe, I use it myself and it's the best variant on python.
But you have to adhere to a strict rule:

  • Everything related to Caffe has to happen strictly within ONE process. You cannot use set_gpu and switch Caffe to GPU on one process, and then use Caffe in another process.
  • I recommend using a queue and pull elements from the queue on the main process. The workers that collect your data can be on other processes.

The error you get is because the CUDA (or OpenCL) initialization of Caffe is both thread local and process local. Cannot be shared across processes.

Here is a snippet that works:

....
from multiprocessing import Process, Queue, JoinableQueue, cpu_count
....

# Switch Caffe to GPU Mode
caffe.set_mode_gpu()
# Select the first GPU
caffe.set_device(0)

....

queue = Queue()
# With a 4 core 8 thread CPU this would give you 7 processes to gather data and one process to manage the GPU and main loop.
number_of_processes = cpu_count() - 1
workers = [Process(target=DataProviderTask, args=(data_prov,(width_in,height_in),(width_out,height_out))) for i in range(0,number_of_processes)]
for w in workers:
    w.start()

while solver.iter < solver_config.max_iter:
    # Fill the whole batch
    for bidx in range(0,solver.net.blobs['data'].shape[0]):
        # Get a queued sample
        im, gt = queue.get()
        # Load the sample into the network
        solver.net.blobs['data'].data[bidx,:] = im
        solver.net.blobs['label'].data[bidx,:] = gt
    # Train one step
    loss = solver.step(1)

Note that the workers here are separate processes, and Caffe is NEVER used/referred to in the workers! That is important.

a smart worker that fills the queue could look something like this:

def DataProviderTask(data_prov, in_size, out_size):
    # Pick a random sample for training
    backoff = 0.1
    while(True):
        if queue.qsize() < 50:
            im, gt = data_prov.draw_train_sample(in_size[0], in_size[1], out_size[0], out_size[1])
            queue.put((im,gt))
            backoff = 0.1
        else:
            time.sleep(backoff)
            backoff *= 2

@naibaf7 Thanks a lot for your comprehensive explanation.

Everything related to Caffe has to happen strictly within ONE process. You cannot use set_gpu and switch Caffe to GPU on one process, and then use Caffe in another process.

I didn't know this, and was the main issue for my error. Thanks for pointing out. If I may ask, why is this so?

Apart from that, the speedup was quite remarkable. My main task was using a pretrained alexnet and use it for label prediction. For 3650 test images, without multiprocessing, it took 639 seconds, and consumed 256 MB of the gpu(as shown by using nvidia-smi). With multiprocessing it took 46 seconds and consumed 526 MB.
Initially, I used simple bash script, to run the python file in 10 different terminals. This resulted in testing 365000 images, and was able to use 256*10 MB of the gpu. What I actually wanted multiprocessing to do was something similar, but even though the speedup is great, and more efficient, it is still able to utilize only 526MB of the gpu. For this I would need to make a copy of the net say 10 times, and do the things in parallel. But as you stated, I can't really do that, what are some good workarounds for the same?

@TheShadow29
You can't use Caffe objects on more than one process due to how Python is designed and how Python interacts with C++ and how the GPU is addressed inside the C++ code of Caffe.
Basically, in both Caffe and TensorFlow, there exists a thread local session, which keeps all the states and the pointers to the GPU device and memory. These objects are protected from access of other processes that run independently. Two python processes, even though you launch them in one code, are separate (you can see that in your task manager!) and data transfer between the python processes happens through pickling (see: https://docs.python.org/2/library/pickle.html), which is basically object serialization. You can't serialize the resources owned by the C++ code of Caffe, especially not the GPU initialization...
Python also doesn't really support two independent threads in one process, due to the GIL (https://wiki.python.org/moin/GlobalInterpreterLock). So no true speedups with using threads in python, so multiprocessing is the way to go for python. (Two threads could both access Caffe at the same time though, in theory, and if done right!)

Hm, it's weird that it's using twice as much RAM.
Well, usually you can just increase the batch size of your AlexNet to process more images per batch.
And then load the images into a queue on multiple processes if one process is not fast enough to fill the queue.
So usually it's not necessary to load multiple networks, except if you want to use multiple GPUs for parallel inference. A big enough batch size and loading the data fast enough should be sufficient in most cases. You can test the theoretical limit of your GPU speed by using the Caffe built-in benchmark and a DummyData layer.

If the speed is limited by data preprocessing, you can also consider making a new Caffe GPU layer that will do the preprocessing faster.

Anyways, if you still think it's the right idea to load 10 networks, you can do this.
What you need for this is loading Caffe once per process, and also use set_gpu and load the network once per process, and you could launch these processes from a master process which communicates with the Caffe worker processes. I haven't tried this myself yet, but maybe SubProcess is the right starting point for this: https://docs.python.org/2/library/subprocess.html.

@naibaf7 I don't see the same speed ups with multiprocessing either. I believe I raised an issue about this before but it was closed

@naibaf7 That explains quite a lot. I will look into those methods for speed up. Thanks a lot.

@TheShadow29 No problem :)
@pythonanonuser Under the right circumstances and when used as I show above in my example, it can give massive speedups. I.e. in my case i need to filter and rotate the input images to generate a larger training data set on-the-fly (too large to be cached). This usually takes 100 ms per image. But the network only takes 50 ms per forward pass. So that the GPU is not getting bored, I have to do it parallel on 7 threads, but limiting the queue depth to 50 elements with increasing backoff delay to avoid spinning of the threads in a no-operation loop.
Another workaround would of course be to implement rotation and filtering on the GPU, but that's more work.

@naibaf7 see my open issue #4730 for an accurate description of the behavior I am experiencing

Was this page helpful?
0 / 5 - 0 ratings