Hi Matthew, A couple of comments and questions:
- why are you using 2 i/o threads? That is unnecessary - what's your message rate? - how many CPUs on the system? - why create the second message via a temporary string, when you can just send the message you received, as-is? The code otherwise looks fine. So what you're seeing is that the code blocks in the zmq_poll() call and never returns? -Pieter On Mon, Oct 17, 2011 at 3:12 PM, Matthew West <[email protected]> wrote: > Hey gang, > > So we’ve been working with ZeroMQ and had a lot of success with using it > to provide a highly agile environment when it comes to moving data around > where ever we may want it to go. Unfortunately we are seeing a problem as of > lately with our Aggregation Server (Sink), which is in all actuality a Pull > Socket which receives a message and then publishes it to another Pub Socket. > Everything runs smoothly for a little while, sometimes even the better part > of a day. Unfortunately however, it would seem like ZeroMQ gets blocked at > some point with trying to shuttle the messages from the pull socket to the > publish socket. According to netstat, the port is still accessible, but all > of the information I would expect to see is actually building up in a memory > cache being controlled by zeromq pull socket, this is expected considering > we aren’t expunging the messages correctly. Essentially since the Pull > socket can’t empty its messages, it just continues to receive them but never > pushes them to the publish port and never performs a free on the msg buffer > / message. > > The following is our current C code server that we have dubbed the Sink. > We are running zeromq v2.1.10, and we have also tried this out with 2.1.8 > and seen the same results. We have client daemons connected from > approximately 700 nodes at this point, pushing data to this server. Any > assistance with this matter would be great, we’ve been trying to understand > why it seems like the socket just becomes unreachable at random times while > trying to print information to the publish port. Running a strace –p <pid> > -f on the process shows us doing recvfrom with msg data in the payload, but > we are never seeing that data be printed back out the publish port. > > Thanks again, > > // > // Aggregation Server > // > #include "zhelpers.h" > #include <stdio.h> > > void my_free (void *data,void *hint) { > free(data); > } > > int main () { > s_version (); > > void *context = zmq_init (2); > > // Sink socket > void *sink = zmq_socket (context, ZMQ_PULL); > zmq_bind (sink, "tcp://*:5565"); > > // Pub socket > void *publisher = zmq_socket (context, ZMQ_PUB); > zmq_bind (publisher, "tcp://*:5566"); > > zmq_pollitem_t items[] = { > { sink,0,ZMQ_POLLIN,0 }, > }; > > size_t poll_size = (sizeof(items)/sizeof(zmq_pollitem_t)); > > while (1) { > // Read message contents > zmq_poll (items,poll_size,-1); > if(items [0].revents & ZMQ_POLLIN) { > zmq_msg_t buf; > zmq_msg_init (&buf); > if(zmq_recv(sink,&buf,0)) { > printf("WARNING recv failure\n"); > continue; > } > int size = zmq_msg_size(&buf); > char *string = malloc(size+1); > memcpy(string,zmq_msg_data(&buf),size); > string[size] = 0; > zmq_msg_close(&buf); > > zmq_msg_t msg; > zmq_msg_init_data(&msg,string,size,my_free,NULL); > zmq_send(publisher,&msg,0); > zmq_msg_close(&msg); > } > } > > // We never get here but clean up anyhow > printf("WARNING exiting\n"); > zmq_close (publisher); > zmq_close (sink); > zmq_term (context); > return 0; > } > > -- > > Matt West > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > > _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
