Pyzmq: Problem in High Water Mark option in ZMQ communication

Created on 20 Nov 2018  路  3Comments  路  Source: zeromq/pyzmq

I have a problem in High Water Mark option in ZMQ communication:

According to the ZeroMQ documentation, a pub socket is supposed to drop messages once the number of queued messages reaches the high-water mark.

This doesn't seem to work in the following example (and yes I do set the hwm before bind/connect):

import time
import pickle
from threading import Thread
import zmq

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.set_hwm(2)
    pub.bind('tcp://*:5555')

    i = 0
    while True:
        # Send message every 100ms
        time.sleep(0.1)
        pub.send_string("test", zmq.SNDMORE)
        pub.send_pyobj(i)
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.subscribe("test")
    sub.connect('tcp://localhost:5555')
    while True:
        # Receive messages only every second
        time.sleep(1)
        msg = sub.recv_multipart()
        print("Sub: %d" % pickle.loads(msg[1]))

t_pub = Thread(target=pub_thread)
t_sub = Thread(target=sub_thread)
t_pub.start()
t_sub.start()

while True:
    pass

I'm sending messages on pub 10 times faster than reading them on the sub socket, hwm is set to 2. I would expect to only receive about every 10th message. Instead, I see the following output:

Sub: 0
Sub: 1
Sub: 2
Sub: 3
Sub: 4
Sub: 5
Sub: 6
Sub: 7
Sub: 8
Sub: 9
Sub: 10
Sub: 11
Sub: 12
Sub: 13
Sub: 14
...

So I see all messages arriving, thus they are held in some queue until I read them. Same holds true when adding a hwm=2 on the sub socket as well before connect.

What am I doing wrong or am I misunderstanding hwm?

I use pyzmq version 17.1.2


Also I tried with the following code, but result was the same:

import time
import pickle
import zmq
from threading import Thread

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.setsockopt(zmq.SNDHWM, 2)  # This line was changed.
    pub.bind('tcp://*:5555')

    i = 0
    while True:
        time.sleep(0.1)
        pub.send_string("test", zmq.SNDMORE)
        pub.send_pyobj(i)
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBER, b'')
    sub.setsockopt(zmq.RCVHWM, 2)  # This line added.
    # sub.setsockopt(zmq.CONFLATE, 1)  # Last msg only (another option).
    sub.connect('tcp://localhost:5555')
    while True:
        time.sleep(1)
        msg = sub.recv_multipart()
        print("Sub: %d" % pickle.loads(msg[1]))

t_pub = Thread(target=pub_thread)
t_pub.start()
sub_thread()  # Start with main thread.

These posts (POST1, POST2, POST3) are relevant to this issue in the Stack Overflow.


Thanks in advance,

Most helpful comment

Messages are held in operating system's network buffers. I have found HWMs to be not that useful because of that. Here is modified code where subscriber misses messages:

import time
import pickle
import zmq
from threading import Thread
import os

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.setsockopt(zmq.SNDHWM, 2)
    pub.setsockopt(zmq.SNDBUF, 2*1024) # see: http://api.zeromq.org/4-2:zmq-setsockopt
    pub.bind('tcp://*:5555')
    i = 0
    while True:
        time.sleep(0.001)
        pub.send_string(str(i), zmq.SNDMORE)
        pub.send(os.urandom(1024))
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    sub.setsockopt(zmq.RCVHWM, 2)
    sub.setsockopt(zmq.RCVBUF, 2*1024)
    sub.connect('tcp://localhost:5555')
    while True:
        time.sleep(0.1)
        msg, _ = sub.recv_multipart()
        print("Received:", msg.decode())

t_pub = Thread(target=pub_thread)
t_pub.start()
sub_thread()

Output looks something like this:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 47
Received: 48
Received: 64
Received: 65
Received: 84
Received: 85
Received: 159
Received: 160
Received: 270

Messages are missed because all queues/buffers are full and publisher starts to drop messages (see documentation for ZMQ_PUB: http://api.zeromq.org/4-2:zmq-socket).

All 3 comments

Messages are held in operating system's network buffers. I have found HWMs to be not that useful because of that. Here is modified code where subscriber misses messages:

import time
import pickle
import zmq
from threading import Thread
import os

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.setsockopt(zmq.SNDHWM, 2)
    pub.setsockopt(zmq.SNDBUF, 2*1024) # see: http://api.zeromq.org/4-2:zmq-setsockopt
    pub.bind('tcp://*:5555')
    i = 0
    while True:
        time.sleep(0.001)
        pub.send_string(str(i), zmq.SNDMORE)
        pub.send(os.urandom(1024))
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    sub.setsockopt(zmq.RCVHWM, 2)
    sub.setsockopt(zmq.RCVBUF, 2*1024)
    sub.connect('tcp://localhost:5555')
    while True:
        time.sleep(0.1)
        msg, _ = sub.recv_multipart()
        print("Received:", msg.decode())

t_pub = Thread(target=pub_thread)
t_pub.start()
sub_thread()

Output looks something like this:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 47
Received: 48
Received: 64
Received: 65
Received: 84
Received: 85
Received: 159
Received: 160
Received: 270

Messages are missed because all queues/buffers are full and publisher starts to drop messages (see documentation for ZMQ_PUB: http://api.zeromq.org/4-2:zmq-socket).

@haverins Thanks it works.

Is there any other way to set a limit on the number of buffer instead of buffer size?

At first glance set_hwm method is broken for modern 0mq.
It is effectively

        if major >= 3:
            raised = None
            try:
                self.sndhwm = value
            except Exception as e:
                raised = e
            try:
                self.rcvhwm = value
            except Exception as e:
                raised = e

            if raised:
                raise raised

sndhwm is not used in any other way.
Am I missing something?

Update: yes, I am missing AttributeSetter, sorry.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

fabian-paul picture fabian-paul  路  13Comments

ncarenton picture ncarenton  路  15Comments

hrunting picture hrunting  路  4Comments

mirceaulinic picture mirceaulinic  路  8Comments

daedric picture daedric  路  5Comments