Hi All,

I am wanting to use libev and zeromq to design an asynchronous proxy like
application. But I found that is not straightforward to monitor zmq sockets
using libev:
http://funcptr.net/2013/04/20/embedding-zeromq-in-the-libev-event-loop/

I am not able to follow the above article properly. Can someone please
explain how to go about doing this using libev. Thanks!

I did find some code on the net, and made minor changes to it because the
original was using an older zeromq version. It is a very simple server
which uses a zeromq REP socket. But the problem is that the recv call
always returns a non-zero value, indicating that the recv failed. Any ideas?

Please find the code attached.
Thank you,

Vishal
	#include <stdio.h>
	#include <stdint.h>
	#include <stdlib.h>
	#include <string.h>
	#include <stdbool.h>
	#include <ctype.h>
	#include <zmq.h>
	#include <ev.h>

	void *zmq_ctx = NULL;
	void *sock;
	bool sending = false;

	

	void watcher_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
	{
	    uint32_t events;
	    size_t opt_len = sizeof(uint32_t);

	    fprintf(stderr, "Event: EV revents=%d\n", revents);
	    if (EV_ERROR & revents)
	    {
		fprintf(stderr, "invalid event\n");
		return;
	    }

	    while(1) {
		if (0 != zmq_getsockopt(sock, ZMQ_EVENTS, &events, &opt_len))
		{
		    fprintf(stderr, "ZMQ_EVENTS failed\n");
		    return;
		}
		fprintf(stderr, "Event: zmq_events=%d\n", events);

		if (events & ZMQ_POLLIN)
		{
		    char buffer[50];
		    fprintf(stderr, "Event: ZMQ_POLLIN\n");
		   
		    if (0 != zmq_recv(sock, buffer,50 ,ZMQ_NOBLOCK))
		    {
			fprintf(stderr, "zmq_recv failed\n");
			return;
		    }
		    fprintf(stderr, "RX: %s", buffer);
		   
		    sending = true;
		    fprintf(stderr, "Received packet, waiting to send response\n");
		}
		if (!sending)   // we have nothing to write
		    return;

		if (events & ZMQ_POLLOUT)
		{
		    //zmq_msg_t msg;
		    char *buf = "A reply";

		    if (0 != zmq_send(sock, buf,7, 0))
		    {
			fprintf(stderr, "zmq_send failed\n");
			return;
		    }
		    fprintf(stderr, "TX: ");
		    sending = false;
		    fprintf(stderr, "Sent packet, waiting for new request\n");
		}
	    }

	}

	int main(int argc, char *argv[])
	{
	     int rc;
	     struct ev_loop *loop;
	     struct ev_io *watcher;
	     int zmq_fd;
	     size_t opt_len = sizeof(int);

       	     if (NULL == (watcher = (struct ev_io *)malloc(sizeof(struct ev_io))))
	     {
		  fprintf(stderr, "out of memory\n");
		  return 1;
	     }

	     loop = ev_default_loop(0);

	     zmq_ctx = zmq_init(1);

	     sock = zmq_socket(zmq_ctx, ZMQ_REP);
	     if (sock == NULL)
	     {
		  fprintf(stderr, "socket!\n");
		  return 0;
	     }

	     rc = zmq_bind(sock, "tcp://*:5555");
	     if (rc < 0)
	     {
	         fprintf(stderr, "bind failed\n");
	         return 1;
	     }

	    if (0 != zmq_getsockopt(sock, ZMQ_FD, (void *)&zmq_fd, &opt_len))
	    {
		  fprintf(stderr, "ZMQ_FD failed\n");
		  return 1;
	    }

	    ev_io_init(watcher, watcher_cb, zmq_fd, EV_READ);
	    ev_io_start(loop, watcher);

	    while(1)
	    {
		ev_loop(loop, 0);
	    }
	    return 0;
	}

_______________________________________________
libev mailing list
[email protected]
http://lists.schmorp.de/cgi-bin/mailman/listinfo/libev

Reply via email to