Pyzmq: memory keeps on growing for zmq.PUB socket sending images

Created on 14 Feb 2019  路  6Comments  路  Source: zeromq/pyzmq

Hello everyone,
I am trying to code a publisher that takes images from a simulation through APIs and publish them through a tcp socket, so that multiple processes -not necessarily on the same machine- can access the images subscribing to the topic. Everything seems to work, since I can see the streamed images via the subscriber. The only issue is that the memory associated with the publisher keeps on growing indefinitely, both with the subscriber active and without.
Here is how the publisher is used:

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5556")
topic = "img_rgba"
time.sleep(2)`

while(1):
     [...] 
     socket.send_string(topic, zmq.SNDMORE)
    socket.send_pyobj(image)

Do you have any clue?

Thanks a lot,
Marco

Most helpful comment

I am encountering the same problem, in the scenario of having exactly 1 PUB and 1 SUB socket.

The scenario is as follows:

  • Process with PUB socket is publishing data
  • Process with SUB socket is receiving this data, but is not immediately reading them
  • Process with PUB socket is done, and stops publishing
  • Process with SUB socket start(/continues) receiving data, and receives all messages.

When this scenario is done, the SUB process does not release all the memory.

I wrote the following test program which reproduces the issue.

#!/usr/bin/env python3
""" Contains test with both sub and pub in different threads """

import platform
import sys
import threading
import time
import zmq

#: Big binary object, with the sample data being sent
BIG_B = b'0' * int(10 * 1024 * 1024)

#: High water mark option for PUB and SUB socket
HWM = 1000

#: Amount of messages to be generated and sent
NUM_TEST_MESSAGES = 500

class SubClass():
    """ Thread class with SUB socket """

    def __init__(self):
        self.subSocket              = None
        self.subContext             = None
        self.subThread              = None
        self.subActionEvent         = threading.Event()
        self.subActionEventDone     = threading.Event()
        self.stopEvent              = threading.Event()

        self.messagesToReceive      = 0
        self.messagesReceivedTotal  = 0
        self.nextReceive = -1

        self.tLock = threading.Lock()

    def start(self):
        """ start threaded SUB """
        self.subThread = threading.Thread(target=self._runSub, name="SubThread")
        self.subThread.start()

    def stop(self):
        """ stop threaded SUB """
        self.subActionEvent.set()
        self.stopEvent.set()
        self.subThread.join()

    def _runSub(self):
        """ SUB thread's main function, receives N messages, when instructed """
        self.subContext = zmq.Context()
        self.subSocket  = self.subContext.socket(zmq.SUB)
        self.subSocket.set_hwm(HWM)
        self.subSocket.connect("tcp://localhost:5555")
        self.subSocket.subscribe('')
        while not self.stopEvent.is_set():
            self.subActionEvent.wait()
            self.subActionEventDone.clear()
            while self.messagesToReceive:
                topic, tag, binMessage = self.subSocket.recv_multipart()

                # Check the sequence counter in the received messages, and print if there is
                # a mismatch
                currentReceive = int(tag.decode())
                with self.tLock:
                    if self.nextReceive != -1 and currentReceive != self.nextReceive:
                        print(
                            "Counter mismatch, expected {}, received {}".format(
                                self.nextReceive, currentReceive))
                    self.nextReceive            = currentReceive + 1
                    self.messagesToReceive     -= 1
                    self.messagesReceivedTotal += 1
            print("Received {} messages in total".format(self.messagesReceivedTotal))
            self.subActionEvent.clear()
            self.subActionEventDone.set()

    def receiveMessages(self, numMessages):
        """ receive N messages (in SUB thread) """
        with self.tLock:
            self.messagesToReceive += numMessages
        self.subActionEvent.set()
        self.subActionEventDone.wait()


class PubClass:
    """ Thread class with PUB socket """
    def __init__(self):
        self.pubSocket          = None
        self.pubContext         = None
        self.pubThread          = None
        self.pubActionEvent     = threading.Event()
        self.pubActionEventDone = threading.Event()
        self.stopEvent          = threading.Event()

        self.messagesToSend     = 0
        self.messagesSendTotal  = 0

        self.tLock = threading.Lock()

    def _runPub(self):
        """ PUB thread's main function, sends N messages, when instructed """
        self.pubContext = zmq.Context()
        self.pubSocket  = self.pubContext.socket(zmq.PUB)
        self.pubSocket.set_hwm(HWM)
        self.pubSocket.bind("tcp://*:5555")

        while not self.stopEvent.is_set():
            self.pubActionEvent.wait()
            self.pubActionEventDone.clear()
            while self.messagesToSend:
                self.pubSocket.send_multipart(
                    (b'topic', str(self.messagesSendTotal).encode(), BIG_B))
                with self.tLock:
                    self.messagesToSend    -= 1
                    self.messagesSendTotal += 1
            self.pubActionEvent.clear()
            self.pubActionEventDone.set()


        self.pubSocket.close()
        del self.pubSocket
        self.pubSocket = None

    def start(self):
        """ start threaded PUB """
        self.pubThread = threading.Thread(target=self._runPub, name="PubThread")
        self.pubThread.start()

    def stop(self):
        """ stop threaded PUB """
        self.pubActionEvent.set()
        self.stopEvent.set()
        self.pubThread.join()

    def send(self, numMessages):
        """ send N messages (in PUB thread) """
        with self.tLock:
            self.messagesToSend += numMessages
        self.pubActionEvent.set()
        self.pubActionEventDone.wait()


def testMain():
    """ memory leak test program """
    print("------------ System info ------------")
    print("Operating system: {}".format(platform.platform()))
    print("Python version: {}".format(sys.version))
    print("ZMQ verion: {}, PyZMQ version: {}".format(zmq.zmq_version(), zmq.pyzmq_version()))
    print("-------------------------------------\n")
    sub = SubClass()
    sub.start()
    print("Started thread with SUB socket")
    time.sleep(1)

    pub = PubClass()
    pub.start()
    print("Started thread with PUB socket")
    time.sleep(1)

    print("Sending {} messages START".format(NUM_TEST_MESSAGES))
    pub.send(NUM_TEST_MESSAGES)

    # clean up variable with message template 
    global BIG_B  #pylint: disable=global-statement
    BIG_B = None
    import gc
    gc.collect()

    # Wait before receiving in SUB socket
    print("Sending messages DONE, memory should be high\n")
    input("The program is now paused, press enter to start receiving in SUB thread\n")
    print("Receiving messages START")
    sub.receiveMessages(NUM_TEST_MESSAGES)
    print("Receiving messages DONE, memory should be low again")

    print("Sleeping program..., press Ctrl-C to stop")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nStopping program...")

    sub.stop()
    pub.stop()

if __name__ == "__main__":
    testMain()

This gives the following output:

------------ System info ------------
Operating system: Linux-4.9.0-8-amd64-x86_64-with-debian-9.8
Python version: 3.5.3 (default, Sep 27 2018, 17:25:39) 
[GCC 6.3.0 20170516]
ZMQ verion: 4.2.1, PyZMQ version: 16.0.2
-------------------------------------

Started thread with SUB socket
Started thread with PUB socket
Sending 500 messages START
Sending messages DONE, memory should be high

Note: memory usage is now ~6GiB.

The program is now paused, press enter to start receiving in SUB thread

Receiving messages START
Received 500 messages in total
Receiving messages DONE, memory should be low again

Note: memory is now ~5.4GiB

Sleeping program..., press Ctrl-C to stop
^C
Stopping program...

Note that several gigabytes of RAM are not released after the SUB has cleared its queue.

I'm running on Debian linux, as shown above. Could you please try whether my script reproduces the issue on other OSes, other ZMQ versions, other PyZMQ versions?

All 6 comments

Can you make a more complete example? Can you also include:

  • Python version
  • OS
  • pyzmq version
  • libzmq version

I was able to run this script with a subscriber and without (using 1MB of random data for each message) and sent several gigabytes without memory ever rising above 15MB for the sender or receiver.

I have Python 3.7, macOS 10.14, libzmq 4.2.5, pyzmq 17.1.2

Hello minrk,
thanks for the reply.
My setup is:

  • Win 10
  • python 3.6.8
  • pyzmq version: 17.1.2
  • libzmq version: do not know, I have installed pyzmq via pip directly

The code is really nothing much more than what I provided in the first message:

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5556")
topic = "img_rgba"
time.sleep(2)`

while(1):
     [here I get a 320*240*4 uint8 numpy array with the image and I assign to "image" variable] 
     socket.send_string(topic, zmq.SNDMORE)
    socket.send_pyobj(image)

In the meantime, I have worked-around the problem by not using the send_pyobj() method:

def send_str_and_nparray(socket, img, string, flags=0, copy=False, track=False):
    md = dict(dtype = str(img.dtype), shape = img.shape)
    socket.send_string(string, zmq.SNDMORE)
    socket.send_json(md, flags|zmq.SNDMORE)
    return socket.send(img, flags, copy=copy, track=track)

inspired by the docs. With the custom method, the memory leak does not occur.
But nonetheless, I would really like to understand why it happened with the "stock" send_pyobj method.

Marco.

Hi Marco,

Can you provide details on the subscriber side of this problem? One issue may be that your subscriber is not keeping up with the publisher and the message are being queued.

I have the same experience as minrk, with no observable memory leak after sending 1000s of images. Adding a brief sleep on the subscriber side, however, causes first the memory of the subscriber to increase as its queue fills, then the memory of the publisher to increase until its queue is filled. Have you tried setting the high-water mark for your publisher to a low value?

The two outstanding questions would be why you don't observe this with the second method. Maybe that slows the publisher down enough for the subscriber to keep up? Also, if no subscribers are subscribed to your publisher it should drop all messages.

It's really important to provide a complete example that others can run that reproduces the issue. It's hard to know if the important part of the leak is in the subset of the script provided or not. Given that it's in send_pyobj, which uses pickle (extremely inefficient for numpy arrays) how you create the image arrays could be the most important piece. I've tried to fill in the blanks with these scripts:

# sender.py
import os
import time

import numpy as np
import zmq

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5556")
topic = "img_rgba"
print(f"sender {os.getpid()}")

time.sleep(2)

while True:
    image = np.empty((320, 240, 4), dtype=np.uint8)
    socket.send_string(topic, zmq.SNDMORE)
    socket.send_pyobj(image)
# receiver.py
import itertools
import os

import zmq

ctx = zmq.Context()
s = ctx.socket(zmq.SUB)
s.connect("tcp://127.0.0.1:5556")
s.subscribe(b'img_rgba')

print(f"receiver {os.getpid()}")
bytes_recvd = 0
for i in itertools.count():
    msg = s.recv_multipart()
    bytes_recvd += sum(len(part) for part in msg)
    print(f"received {i} ({bytes_recvd//1e6:.0f}MB total)")

but they run comfortably for hundreds of thousands of messages and hundreds of gigabytes without every reaching much above 100MB peak memory in either process (and always dropping back down to ~50MB). This could be a Windows-specific issue, though.

How big is the leak? Does it matter how big messages are? Is it as if whole messages are kept in memory, or is it only a leak of a few bytes per message?

I am encountering the same problem, in the scenario of having exactly 1 PUB and 1 SUB socket.

The scenario is as follows:

  • Process with PUB socket is publishing data
  • Process with SUB socket is receiving this data, but is not immediately reading them
  • Process with PUB socket is done, and stops publishing
  • Process with SUB socket start(/continues) receiving data, and receives all messages.

When this scenario is done, the SUB process does not release all the memory.

I wrote the following test program which reproduces the issue.

#!/usr/bin/env python3
""" Contains test with both sub and pub in different threads """

import platform
import sys
import threading
import time
import zmq

#: Big binary object, with the sample data being sent
BIG_B = b'0' * int(10 * 1024 * 1024)

#: High water mark option for PUB and SUB socket
HWM = 1000

#: Amount of messages to be generated and sent
NUM_TEST_MESSAGES = 500

class SubClass():
    """ Thread class with SUB socket """

    def __init__(self):
        self.subSocket              = None
        self.subContext             = None
        self.subThread              = None
        self.subActionEvent         = threading.Event()
        self.subActionEventDone     = threading.Event()
        self.stopEvent              = threading.Event()

        self.messagesToReceive      = 0
        self.messagesReceivedTotal  = 0
        self.nextReceive = -1

        self.tLock = threading.Lock()

    def start(self):
        """ start threaded SUB """
        self.subThread = threading.Thread(target=self._runSub, name="SubThread")
        self.subThread.start()

    def stop(self):
        """ stop threaded SUB """
        self.subActionEvent.set()
        self.stopEvent.set()
        self.subThread.join()

    def _runSub(self):
        """ SUB thread's main function, receives N messages, when instructed """
        self.subContext = zmq.Context()
        self.subSocket  = self.subContext.socket(zmq.SUB)
        self.subSocket.set_hwm(HWM)
        self.subSocket.connect("tcp://localhost:5555")
        self.subSocket.subscribe('')
        while not self.stopEvent.is_set():
            self.subActionEvent.wait()
            self.subActionEventDone.clear()
            while self.messagesToReceive:
                topic, tag, binMessage = self.subSocket.recv_multipart()

                # Check the sequence counter in the received messages, and print if there is
                # a mismatch
                currentReceive = int(tag.decode())
                with self.tLock:
                    if self.nextReceive != -1 and currentReceive != self.nextReceive:
                        print(
                            "Counter mismatch, expected {}, received {}".format(
                                self.nextReceive, currentReceive))
                    self.nextReceive            = currentReceive + 1
                    self.messagesToReceive     -= 1
                    self.messagesReceivedTotal += 1
            print("Received {} messages in total".format(self.messagesReceivedTotal))
            self.subActionEvent.clear()
            self.subActionEventDone.set()

    def receiveMessages(self, numMessages):
        """ receive N messages (in SUB thread) """
        with self.tLock:
            self.messagesToReceive += numMessages
        self.subActionEvent.set()
        self.subActionEventDone.wait()


class PubClass:
    """ Thread class with PUB socket """
    def __init__(self):
        self.pubSocket          = None
        self.pubContext         = None
        self.pubThread          = None
        self.pubActionEvent     = threading.Event()
        self.pubActionEventDone = threading.Event()
        self.stopEvent          = threading.Event()

        self.messagesToSend     = 0
        self.messagesSendTotal  = 0

        self.tLock = threading.Lock()

    def _runPub(self):
        """ PUB thread's main function, sends N messages, when instructed """
        self.pubContext = zmq.Context()
        self.pubSocket  = self.pubContext.socket(zmq.PUB)
        self.pubSocket.set_hwm(HWM)
        self.pubSocket.bind("tcp://*:5555")

        while not self.stopEvent.is_set():
            self.pubActionEvent.wait()
            self.pubActionEventDone.clear()
            while self.messagesToSend:
                self.pubSocket.send_multipart(
                    (b'topic', str(self.messagesSendTotal).encode(), BIG_B))
                with self.tLock:
                    self.messagesToSend    -= 1
                    self.messagesSendTotal += 1
            self.pubActionEvent.clear()
            self.pubActionEventDone.set()


        self.pubSocket.close()
        del self.pubSocket
        self.pubSocket = None

    def start(self):
        """ start threaded PUB """
        self.pubThread = threading.Thread(target=self._runPub, name="PubThread")
        self.pubThread.start()

    def stop(self):
        """ stop threaded PUB """
        self.pubActionEvent.set()
        self.stopEvent.set()
        self.pubThread.join()

    def send(self, numMessages):
        """ send N messages (in PUB thread) """
        with self.tLock:
            self.messagesToSend += numMessages
        self.pubActionEvent.set()
        self.pubActionEventDone.wait()


def testMain():
    """ memory leak test program """
    print("------------ System info ------------")
    print("Operating system: {}".format(platform.platform()))
    print("Python version: {}".format(sys.version))
    print("ZMQ verion: {}, PyZMQ version: {}".format(zmq.zmq_version(), zmq.pyzmq_version()))
    print("-------------------------------------\n")
    sub = SubClass()
    sub.start()
    print("Started thread with SUB socket")
    time.sleep(1)

    pub = PubClass()
    pub.start()
    print("Started thread with PUB socket")
    time.sleep(1)

    print("Sending {} messages START".format(NUM_TEST_MESSAGES))
    pub.send(NUM_TEST_MESSAGES)

    # clean up variable with message template 
    global BIG_B  #pylint: disable=global-statement
    BIG_B = None
    import gc
    gc.collect()

    # Wait before receiving in SUB socket
    print("Sending messages DONE, memory should be high\n")
    input("The program is now paused, press enter to start receiving in SUB thread\n")
    print("Receiving messages START")
    sub.receiveMessages(NUM_TEST_MESSAGES)
    print("Receiving messages DONE, memory should be low again")

    print("Sleeping program..., press Ctrl-C to stop")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nStopping program...")

    sub.stop()
    pub.stop()

if __name__ == "__main__":
    testMain()

This gives the following output:

------------ System info ------------
Operating system: Linux-4.9.0-8-amd64-x86_64-with-debian-9.8
Python version: 3.5.3 (default, Sep 27 2018, 17:25:39) 
[GCC 6.3.0 20170516]
ZMQ verion: 4.2.1, PyZMQ version: 16.0.2
-------------------------------------

Started thread with SUB socket
Started thread with PUB socket
Sending 500 messages START
Sending messages DONE, memory should be high

Note: memory usage is now ~6GiB.

The program is now paused, press enter to start receiving in SUB thread

Receiving messages START
Received 500 messages in total
Receiving messages DONE, memory should be low again

Note: memory is now ~5.4GiB

Sleeping program..., press Ctrl-C to stop
^C
Stopping program...

Note that several gigabytes of RAM are not released after the SUB has cleared its queue.

I'm running on Debian linux, as shown above. Could you please try whether my script reproduces the issue on other OSes, other ZMQ versions, other PyZMQ versions?

Hello minrk,
thanks for the reply.
My setup is:

  • Win 10
  • python 3.6.8
  • pyzmq version: 17.1.2
  • libzmq version: do not know, I have installed pyzmq via pip directly

The code is really nothing much more than what I provided in the first message:

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5556")
topic = "img_rgba"
time.sleep(2)`

while(1):
     [here I get a 320*240*4 uint8 numpy array with the image and I assign to "image" variable] 
     socket.send_string(topic, zmq.SNDMORE)
    socket.send_pyobj(image)

In the meantime, I have worked-around the problem by not using the send_pyobj() method:

def send_str_and_nparray(socket, img, string, flags=0, copy=False, track=False):
    md = dict(dtype = str(img.dtype), shape = img.shape)
    socket.send_string(string, zmq.SNDMORE)
    socket.send_json(md, flags|zmq.SNDMORE)
    return socket.send(img, flags, copy=copy, track=track)

inspired by the docs. With the custom method, the memory leak does not occur.
But nonetheless, I would really like to understand why it happened with the "stock" send_pyobj method.

Marco.

In case it is useful for someone else, I had the same problem and the same solution as @marcociara379. That is, I based my solution on this code extracted from the docs:

import numpy

def send_array(socket, A, flags=0, copy=True, track=False):
    """send a numpy array with metadata"""
    md = dict(
        dtype = str(A.dtype),
        shape = A.shape,
    )
    socket.send_json(md, flags|zmq.SNDMORE)
    return socket.send(A, flags, copy=copy, track=track)

def recv_array(socket, flags=0, copy=True, track=False):
    """recv a numpy array"""
    md = socket.recv_json(flags=flags)
    msg = socket.recv(flags=flags, copy=copy, track=track)
    buf = memoryview(msg)
    A = numpy.frombuffer(buf, dtype=md['dtype'])
    return A.reshape(md['shape'])

My setup is:

  • Ubuntu: 18.04
  • pyzmq: 19.0.1
  • python: 3.6.9

In addition, I would also like to know the reason why send_pyobj() and recv_pyobj() have, sometimes, a memory leak.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

mirceaulinic picture mirceaulinic  路  8Comments

stuaxo picture stuaxo  路  15Comments

hydrogen18 picture hydrogen18  路  26Comments

d53dave picture d53dave  路  7Comments

SavageMessiah picture SavageMessiah  路  23Comments