Hey gang,
So we’ve been working with ZeroMQ and had a lot of success with using it to
provide a highly agile environment when it comes to moving data around where
ever we may want it to go. Unfortunately we are seeing a problem as of lately
with our Aggregation Server (Sink), which is in all actuality a Pull Socket
which receives a message and then publishes it to another Pub Socket.
Everything runs smoothly for a little while, sometimes even the better part of
a day. Unfortunately however, it would seem like ZeroMQ gets blocked at some
point with trying to shuttle the messages from the pull socket to the publish
socket. According to netstat, the port is still accessible, but all of the
information I would expect to see is actually building up in a memory cache
being controlled by zeromq pull socket, this is expected considering we aren’t
expunging the messages correctly. Essentially since the Pull socket can’t empty
its messages, it just continues to receive them but never pushes them to the
publish port and never performs a free on the msg buffer / message.
The following is our current C code server that we have dubbed the Sink. We
are running zeromq v2.1.10, and we have also tried this out with 2.1.8 and seen
the same results. We have client daemons connected from approximately 700 nodes
at this point, pushing data to this server. Any assistance with this matter
would be great, we’ve been trying to understand why it seems like the socket
just becomes unreachable at random times while trying to print information to
the publish port. Running a strace –p <pid> -f on the process shows us doing
recvfrom with msg data in the payload, but we are never seeing that data be
printed back out the publish port.
Thanks again,
//
// Aggregation Server
//
#include "zhelpers.h"
#include <stdio.h>
void my_free (void *data,void *hint) {
free(data);
}
int main () {
s_version ();
void *context = zmq_init (2);
// Sink socket
void *sink = zmq_socket (context, ZMQ_PULL);
zmq_bind (sink, "tcp://*:5565");
// Pub socket
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5566");
zmq_pollitem_t items[] = {
{ sink,0,ZMQ_POLLIN,0 },
};
size_t poll_size = (sizeof(items)/sizeof(zmq_pollitem_t));
while (1) {
// Read message contents
zmq_poll (items,poll_size,-1);
if(items [0].revents & ZMQ_POLLIN) {
zmq_msg_t buf;
zmq_msg_init (&buf);
if(zmq_recv(sink,&buf,0)) {
printf("WARNING recv failure\n");
continue;
}
int size = zmq_msg_size(&buf);
char *string = malloc(size+1);
memcpy(string,zmq_msg_data(&buf),size);
string[size] = 0;
zmq_msg_close(&buf);
zmq_msg_t msg;
zmq_msg_init_data(&msg,string,size,my_free,NULL);
zmq_send(publisher,&msg,0);
zmq_msg_close(&msg);
}
}
// We never get here but clean up anyhow
printf("WARNING exiting\n");
zmq_close (publisher);
zmq_close (sink);
zmq_term (context);
return 0;
}
--
Matt West
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev