As far as I can tell from reading all of the documentation I can find,
by default when you use a PUSH/PULL socket and don't mess with the
LINGER socket option, when you terminate your 0MQ context at the PUSH
end, it should hang until all the messages have been successfully
delivered to the PULL end.
What effect does setting RCVHWM, i.e., the receive high water mark, on
the PULL end have on this? The documentation appears to be completely
silent on this question. For example, the zmq_socket documentation
<http://api.zeromq.org/3-2:zmq-socket> says nothing about the effect of
setting RCVHWM on a PULL socket.
Counterintuitively, it appears that if you set RCVHWM too low on the
PULL end, in fact the socket termination does /not/ hang on the PUSH
end, and messages are lost. The script I've appended to the end of this
message demonstrates this behavior. When I run it, the last line of
output it prints is "Received 7", and then it hangs, i.e., two messages
are lost. When I comment out the "sock.setsockopt(zmq.RCVHWM, 1)" line
or set the high water mark to 10 instead of 1, then all messages are
successfully received.
We need to be able to set the receive high water mark to a relatively
low value at both the PUSH and PULL ends of the sockets our application
uses, because we actually want the socket sends to block at the PUSH end
if end PULL end falls too far behind in processing them, to prevent
memory usage by 0MQ from ballooning in the processes at both ends of the
socket.
Is the behavior we are seeing expected, or a bug? If expected, that (a)
is it documented anywhere (I'm trying to figure out what documentation I
should have read but didn't) and (b) what pattern can/should we follow
to achieve our intended goal of reliable delivery of the entire stream
without memory usage by 0MQ ballooning at either end if the PULL end
falls behind?
I am using 0MQ 3.2.4, PyZMQ 14.0.1, and python 2.7.5 on Fedora 20 Linux
(x86_64).
Thanks,
Jonathan Kamens
------------------------------------------------------------------------
from multiprocessing import Process
import time
import zmq
addr = 'ipc:///tmp/test-socket'
num_messages = 10
send_duration = 2
receive_duration = 4
def sender():
# Send 20 messages in 2 seconds
ctx = zmq.Context()
sock = ctx.socket(zmq.PUSH)
sock.bind(addr)
sleep_time = float(send_duration) / num_messages
for i in range(num_messages):
sock.send(str(i))
print 'Sent %d' % i
time.sleep(sleep_time)
sock.close()
ctx.term()
print 'Sender exiting'
Process(target=sender).start()
ctx = zmq.Context()
sock = ctx.socket(zmq.PULL)
sock.setsockopt(zmq.RCVHWM, 1)
sock.connect(addr)
sleep_time = float(receive_duration) / num_messages
for i in range(num_messages):
msg = sock.recv()
print 'Received %s' % msg
time.sleep(sleep_time)
sock.close()
ctx.term()
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev