Hello Joshua, Thanks for your response. I donot care when subscriber gets the data as sequence for my publisher is infinite. Meaning no Start / no End.
How do I close this previous queue ? Even though I stop/start publisher, does it still maintain a state of the previously connected subscriber and their queue ? Any suggestion on how to solve this would be very helpful ? Thanks, Janak ________________________________ From: Joshua Foster <[email protected]> To: ZeroMQ development list <[email protected]> Sent: Thu, November 11, 2010 1:19:27 PM Subject: Re: [zeromq-dev] Question about the PUBSUB design pattern when server restarts. If you want the subscriber to pick up where it last left off, be sure to set the identity. The publisher is probably creating a new queue with the "second" subscriber, but leaving the previous queue open. The previous queue is the one that continues to grow (causing memory to increase). Joshua On 11/11/2010 1:53 PM, Janak Raja wrote: Hello, > > > I recently came across zeroMQ and wanted to understand on what > will >be the behavior for PUBSUB design pattern when PUB goes down for >more >than 2 minutes. > > > I tried wu example with a little modification to send packets >continuously. Please let me know what I am doing wrong in this. > > > Do I have to call connect from client side after every s_recv call >? > Currently, I see that if I dont call connect on SUB and PUB stops >and 2 minutes later starts again, memory usage on pub keeps on >increasing ? May be, its because it might be keeping it in queue for >the sub to get the data. > > > Please advice, > > > Here is the Publisher code: > // >// Weather update server >// Binds PUB socket to tcp://*:5556 >// Publishes random weather updates >// >#include "zhelpers.h" > > >int main () { > // Prepare our context and publisher > void *context = zmq_init (1); > void *publisher = zmq_socket (context, ZMQ_PUB); > zmq_bind (publisher, "tcp://*:5556"); > zmq_bind (publisher, "ipc://weather.ipc"); > > > // Initialize random number generator > srandom ((unsigned) time (NULL)); > while (1) { > // Get values that will fool the boss > int zipcode, temperature, relhumidity; > zipcode = within (100000); > temperature = within (215) - 80; > relhumidity = within (50) + 10; > > > // Send message to all subscribers > char update [20]; > sprintf (update, "%05d %d %d", zipcode, temperature, >relhumidity); > s_send (publisher, update); > } > zmq_close (publisher); > zmq_term (context); > return 0; >} > > > > > > > > >Subscriber code: > // >// Weather update client >// Connects SUB socket to tcp://localhost:5556 >// Collects weather updates and finds avg temp in zipcode >// >#include "zhelpers.h" > > >int main (int argc, char *argv[]) >{ > void *context = zmq_init (1); > > > // Socket to talk to server > printf ("Collecting updates from weather server...\n"); > void *subscriber = zmq_socket (context, ZMQ_SUB); > zmq_connect (subscriber, "tcp://localhost:5556"); > > > // Subscribe to zipcode, default is NYC, 10001 > char *filter = (argc > 1)? argv [1]: "10001 "; > zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen >(filter)); > while(1) > { > // Process 100 updates > int update_nbr; > long total_temp = 0; > for (update_nbr = 0; update_nbr < 100; update_nbr++) { > char *string = s_recv (subscriber); > int zipcode, temperature, relhumidity; > sscanf (string, "%d %d %d", > &zipcode, &temperature, &relhumidity); > total_temp += temperature; > free (string); > } > printf ("Average temperature for zipcode '%s' was %dF\n", > filter, (int) (total_temp / update_nbr)); > } > zmq_close (subscriber); > zmq_term (context); > return 0; >} > > > > >Thanks, >Janak > > _______________________________________________ zeromq-dev mailing list >[email protected] >http://lists.zeromq.org/mailman/listinfo/zeromq-dev >
_______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
