Hey gang,

  So we’ve been working with ZeroMQ and had a lot of success with using it to 
provide a highly agile environment when it comes to moving data around where 
ever we may want it to go. Unfortunately we are seeing a problem as of lately 
with our Aggregation Server (Sink), which is in all actuality a Pull Socket 
which receives a message and then publishes it to another Pub Socket. 
Everything runs smoothly for a little while, sometimes even the better part of 
a day. Unfortunately however, it would seem like ZeroMQ gets blocked at some 
point with trying to shuttle the messages from the pull socket to the publish 
socket. According to netstat, the port is still accessible, but all of the 
information I would expect to see is actually building up in a memory cache 
being controlled by zeromq pull socket, this is expected considering we aren’t 
expunging the messages correctly. Essentially since the Pull socket can’t empty 
its messages, it just continues to receive them but never pushes them to the 
publish port and never performs a free on the msg buffer / message.

  The following is our current C code server that we have dubbed the Sink. We 
are running zeromq v2.1.10, and we have also tried this out with 2.1.8 and seen 
the same results. We have client daemons connected from approximately 700 nodes 
at this point, pushing data to this server. Any assistance with this matter 
would be great, we’ve been trying to understand why it seems like the socket 
just becomes unreachable at random times while trying to print information to 
the publish port. Running a strace –p <pid> -f on the process shows us doing 
recvfrom with msg data in the payload, but we are never seeing that data be 
printed back out the publish port.

Thanks again,

//
//  Aggregation Server
//
#include "zhelpers.h"
#include <stdio.h>

void my_free (void *data,void *hint) {
    free(data);
}

int main () {
    s_version ();

    void *context = zmq_init (2);

    // Sink socket
    void *sink = zmq_socket (context, ZMQ_PULL);
    zmq_bind (sink, "tcp://*:5565");

    // Pub socket
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5566");

    zmq_pollitem_t items[] = {
        { sink,0,ZMQ_POLLIN,0 },
    };

    size_t poll_size = (sizeof(items)/sizeof(zmq_pollitem_t));

    while (1) {
        // Read message contents
        zmq_poll (items,poll_size,-1);
        if(items [0].revents & ZMQ_POLLIN) {
            zmq_msg_t buf;
            zmq_msg_init (&buf);
            if(zmq_recv(sink,&buf,0)) {
                printf("WARNING recv failure\n");
                continue;
            }
            int size = zmq_msg_size(&buf);
            char *string = malloc(size+1);
            memcpy(string,zmq_msg_data(&buf),size);
            string[size] = 0;
            zmq_msg_close(&buf);

            zmq_msg_t msg;
            zmq_msg_init_data(&msg,string,size,my_free,NULL);
            zmq_send(publisher,&msg,0);
            zmq_msg_close(&msg);
        }
    }

    // We never get here but clean up anyhow
    printf("WARNING exiting\n");
    zmq_close (publisher);
    zmq_close (sink);
    zmq_term (context);
    return 0;
}

--

Matt West
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to