Hi,
(My current ØMQ version is 3.2.2)
I have one thread which produces data (40MB/s) and publishes this over a PUB
socket in 2kB/message. Multiple clients can subscribe (SUB sockets). With
one of the SUB clients, I want to save the data to disk. However, I get lost
packets in the client which tries to write to the harddisk. The other
clients which don't try to save the data to disk receive all messages.
The disk *should* be fast enough to handle the data rate, I tested with:
sync;
time dd if=/dev/zero of=data.tmp bs=2k count=100000; time sync
1000000+0 records in
1000000+0 records out
2048000000 bytes (2.0 GB) copied, 11.7767 s, 174 MB/s
real 0m11.930s
user 0m0.235s
sys 0m7.886s
real 0m2.064s
user 0m0.001s
sys 0m0.011s
The architecture is very simple, below is the relevant code for the
publisher and subscriber.
I suspect, there is a time when the client is "very" busy saving/syncing the
data to disk. But averaged over time, the rate should be fine, I guess. I
tried to experiment with different ZMQ_RCVHWM/ZMQ_RCVBUF values, but this
didn't seem to have an effect. When setting the HWM to 0, shouldn't the
memory footprint of the client grow in case the file writing takes 'too'
long, to buffer the incoming messages? I monitored the memory consumption
with "top" but didn't see any increase.
Also, I tried with inproc sockets in the same process as well as with tcp
sockets running on two machines. Similar behavior.
20MB/s has "only a few" lost packets
40MB/s has more and
80MB/s has "considerable amount" of lost packets..
I didn't really measure the exact amount, as any lost packet is unacceptable
for me atm.
I thought, if the file writing is very peaky, I simply need a larger buffer.
Is my RCV buffer big enough? Shouldn't it grow dynamically w/ HWM set to 0?
Should I chunk multiple messages together with multipart messages and by
this reduce the number of times the "fwrite" function gets called?
Do you have a suggestion? Do I miss something?
Thanks!
Cheers,
Jan
CLIENT:
zmq::context_t *zmq_context = new zmq::context_t(1);
zmq::socket_t socket (*zmq_context, ZMQ_SUB);
socket.connect ("tcp://192.168.0.1:3333");
int val = 20000 * 40; // OK???
socket.setsockopt(ZMQ_RCVHWM, &val, sizeof (val))
val = 20000 * 40 * 2048;
socket.setsockopt(ZMQ_RCVBUF, &val, sizeof (val))
for (;;) {
zmq::message_t msg;
socket.recv(&msg);
fwrite( msg.data(), sizeof(unsigned char), msg.size(), of );
static unsigned long long last_frame_no = 0;
unsigned long long frame_no = 0;
memcpy( (unsigned long long *) &frame_no, \
(unsigned long long *) msg.data(), \
sizeof(unsigned long long));
/* Sanity Check for the FrameNo */
if ( frame_no > last_frame_no+1 )
fprintf(stderr,"frames missing %u\n", frame_no - last_frame_no);
last_frame_no = frame_no;
}
SERVER:
zmq::context_t *zmq_context = new zmq::context_t(1);
zmq::socket_t workers (*zmq_context, ZMQ_PUB);
//workers.bind ("inproc://workers");
workers.bind ("tcp://*:3333");
int hwm = 0; // OK???
workers.setsockopt(ZMQ_SNDHWM, &hwm, sizeof (hwm));
for(;;) {
unsigned long long frameno = 0;
zmq::message_t message3(sizeof(unsigned long long) + \
1024 * sizeof(unsigned short));
memcpy( (unsigned long long *) message3.data(), \
&frameno, sizeof(unsigned long long) );
memcpy( (unsigned short *) message3.data() \
+ sizeof(unsigned long long), ordered_buffer,
1024 * sizeof(unsigned short) );
workers.send(message3);
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev