I want to limit the maximum number of messages kept in a queue when working with a PUB/SUB pattern. I use set_hwm(1)
to limit the length to 1 message. However, this does not seem to have any effect.
sender.py
sends a message every 0.1 seconds:
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.set_hwm(1)
socket.bind("tcp://127.0.0.1:5556")
i = 0
while True:
i += 1
socket.send_string(str(i))
print("sent: ", i)
time.sleep(0.1)
receiver.py
receives a message every 1.0 second:
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.set_hwm(1)
socket.setsockopt_string(zmq.SUBSCRIBE, optval="")
while True:
msg = socket.recv_string()
print("received: ", msg)
time.sleep(1)
The receiver should only get around 1 out of 10 messages.
The receiver receives every single message. The queue length does not appear to be limited at all.
Output of receiver.py
looks something like this:
$ python client.py
received: 25
received: 26
received: 27
received: 28
received: 29
received: 30
received: 31
received: 32
received: 33
Python 3.5.2
zeromq 16.0.2
Ubuntu 16.04
HWM in zmq is pretty complicated, and can't really be used to control flow in a fine-grained way. It's mainly there to limit memory/network pressure when receivers can't keep up. One of the reasons it is challenging (especially RCVHWM) is that there aren't just two queues. There's also the transport buffer(s) in between the sockets, which tends to be harder to observe and control, and contain messages that have been removed from the sender's queue (SNDHWM) and are waiting to be delivered to the receiver's queue (RCVHWM). So the real HWM is SNDHWM + RCVHWM + X, where X is dependent on a lot of factors.
From testing, it seems like there's a finite buffer, so lots of tiny messages can be waiting to be received by zmq. If you increase the message size to ~1MB, you will see the expected behavior of ~1/10 messages after all of the buffers fill up:
received: 9
received: 10
received: 11
received: 12
received: 19
received: 20
received: 30
received: 39
received: 48
received: 58
received: 69
received: 77
but after stopping the sender, you can see that there are ~5 messages still waiting in the queue that will be delivered after the sender exits.
This question is relevant to my issue #1111 (although with PUSH-PULL instead of PUB-SUB). Is there a way to force the desired behavior (keep only ~1/10) even with messages of small size?
@LinxiFan Maybe you should read the zguide before raising this issue. Nevertheless, your desired behavior, aka. a size-limited queue can be implemented on top of ZeroMQ with setting up a proxy in between, or connect all the nodes to a broker for dispatching.
There is an error in the receiver.py
script, you set the HWM after the connect call, whereas you should set it before the connect call.
So, receiver.py
should be like:
#(...)
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.set_hwm(1)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, optval="")
#(...)