Hi All,
I am wanting to use libev and zeromq to design an asynchronous proxy like
application. But I found that is not straightforward to monitor zmq sockets
using libev:
http://funcptr.net/2013/04/20/embedding-zeromq-in-the-libev-event-loop/
I am not able to follow the above article properly. Can someone please
explain how to go about doing this using libev. Thanks!
I did find some code on the net, and made minor changes to it because the
original was using an older zeromq version. It is a very simple server
which uses a zeromq REP socket. But the problem is that the recv call
always returns a non-zero value, indicating that the recv failed. Any ideas?
Please find the code attached.
Thank you,
Vishal
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <ctype.h>
#include <zmq.h>
#include <ev.h>
void *zmq_ctx = NULL;
void *sock;
bool sending = false;
void watcher_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
uint32_t events;
size_t opt_len = sizeof(uint32_t);
fprintf(stderr, "Event: EV revents=%d\n", revents);
if (EV_ERROR & revents)
{
fprintf(stderr, "invalid event\n");
return;
}
while(1) {
if (0 != zmq_getsockopt(sock, ZMQ_EVENTS, &events, &opt_len))
{
fprintf(stderr, "ZMQ_EVENTS failed\n");
return;
}
fprintf(stderr, "Event: zmq_events=%d\n", events);
if (events & ZMQ_POLLIN)
{
char buffer[50];
fprintf(stderr, "Event: ZMQ_POLLIN\n");
if (0 != zmq_recv(sock, buffer,50 ,ZMQ_NOBLOCK))
{
fprintf(stderr, "zmq_recv failed\n");
return;
}
fprintf(stderr, "RX: %s", buffer);
sending = true;
fprintf(stderr, "Received packet, waiting to send response\n");
}
if (!sending) // we have nothing to write
return;
if (events & ZMQ_POLLOUT)
{
//zmq_msg_t msg;
char *buf = "A reply";
if (0 != zmq_send(sock, buf,7, 0))
{
fprintf(stderr, "zmq_send failed\n");
return;
}
fprintf(stderr, "TX: ");
sending = false;
fprintf(stderr, "Sent packet, waiting for new request\n");
}
}
}
int main(int argc, char *argv[])
{
int rc;
struct ev_loop *loop;
struct ev_io *watcher;
int zmq_fd;
size_t opt_len = sizeof(int);
if (NULL == (watcher = (struct ev_io *)malloc(sizeof(struct ev_io))))
{
fprintf(stderr, "out of memory\n");
return 1;
}
loop = ev_default_loop(0);
zmq_ctx = zmq_init(1);
sock = zmq_socket(zmq_ctx, ZMQ_REP);
if (sock == NULL)
{
fprintf(stderr, "socket!\n");
return 0;
}
rc = zmq_bind(sock, "tcp://*:5555");
if (rc < 0)
{
fprintf(stderr, "bind failed\n");
return 1;
}
if (0 != zmq_getsockopt(sock, ZMQ_FD, (void *)&zmq_fd, &opt_len))
{
fprintf(stderr, "ZMQ_FD failed\n");
return 1;
}
ev_io_init(watcher, watcher_cb, zmq_fd, EV_READ);
ev_io_start(loop, watcher);
while(1)
{
ev_loop(loop, 0);
}
return 0;
}
_______________________________________________
libev mailing list
[email protected]
http://lists.schmorp.de/cgi-bin/mailman/listinfo/libev