Hi,
In stock trading industry it's pretty common to delay stock quote feeds
so that it can be sold for discount price to the clients that have no
need for real-time stock quote feeds.
Attached is the source code for a simple application that delays
messages by 10 seconds. It can be tested using "prompt" and "display"
applications from "chat" example (examples/chat). Write an instant
message and it gets delivered with 10 sec delay :)
I hope some of you will find this piece of code useful.
Martin
#include <stdint.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/time.h>
#include <zmq.hpp>
// Delay in seconds.
#define DELAY 10
// Return current time (in second precision).
uint64_t now ()
{
timeval tv;
int rc = gettimeofday (&tv, NULL);
assert (rc == 0);
return tv.tv_sec;
}
// Worker thread to receive messages and push them into internal pipe.
void *receive_routine (void *arg)
{
zmq::context_t *ctx = (zmq::context_t*) arg;
// Socket to read messages from the network.
zmq::socket_t ins (*ctx, ZMQ_SUB);
ins.setsockopt (ZMQ_SUBSCRIBE, "", 0);
ins.bind ("tcp://lo:5555");
// Socket to push message into internal pipe.
zmq::socket_t outs (*ctx, ZMQ_PUB);
outs.connect ("inproc://pipe");
while (true) {
// Receive message.
zmq::message_t msg;
ins.recv (&msg);
// Tag the message with current time.
zmq::message_t msgex (msg.size () + 8);
*(uint64_t*) msgex.data () = now ();
memcpy (((unsigned char*) msgex.data ()) + 8, msg.data (), msg.size ());
// Push it into the pipe (i.e. send it to the main thread).
outs.send (msgex);
}
}
int main ()
{
zmq::context_t ctx (2, 1);
// Socket to pop the messages from the internal pipe.
zmq::socket_t ins (ctx, ZMQ_SUB);
ins.setsockopt (ZMQ_SUBSCRIBE, "", 0);
ins.bind ("inproc://pipe");
// Socket to send the messages to the network.
zmq::socket_t outs (ctx, ZMQ_PUB);
outs.bind ("tcp://lo:5556");
// Start the worket thread.
pthread_t worker;
int rc = pthread_create (&worker, NULL, receive_routine, (void*) &ctx);
assert (rc == 0);
while (true) {
// Get next message from the internal pipe.
zmq::message_t msg;
ins.recv (&msg);
// Get the message timestamp.
uint64_t arrived = *(uint64_t*) msg.data ();
// If the delay haven't yet elapsed, wait for a while.
uint64_t current = now ();
if (current < arrived + DELAY)
sleep (arrived + DELAY - current);
// Strip the timestamp from the message.
zmq::message_t msgraw (msg.size () - 8);
memcpy (msgraw.data (), ((unsigned char*) msg.data ()) + 8,
msgraw.size ());
// Send the delayed message to the network.
outs.send (msgraw);
}
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev