Set the HWM before connecting or binding.
On Jun 25, 2013 2:36 AM, "David Cheney" <[email protected]> wrote:

>
> I have s simple "Sender" S.java and "Receiver" R.java using jzmq.  I start
> the receiver, then the sender.  I stop the receiver after it has received
> about 350 messages; the sender reports queuing 1000 messages - so far so
> good.  I restart and quickly stop the receiver.  It reports receiving all
> but the first few queued messages, then the sender blocks once it has
> queued its second batch of 1000.
>
> My attempts to set the HWM seem to have no effect.  The sender stops on
> units of 1000 regardless of what I set the HWM to on either end.  Clearly
> there is something I'm missing.  Thanks in advance!
>
> Platform is kvm under Centos 6.4 / Linux 2.6.32 64 bit, with Java at 1.7.0.
>
>
> *--- Sender ---*
>
> package com.peaxy.zmqtest;
>
> import org.zeromq.ZMQ;
> import org.zeromq.ZMQ.Context;
>
> public class S
> {
>     static       String targetIp  = null;
>     static       int    targetPort = 5558;
>
>     public static void main (String[] args) throws InterruptedException
>     {
>         if (args.length > 0) {
>             targetIp = args[0];
>         } else {
>             System.err.println("usage: S <ip.address>");
>             System.exit(1);
>         }
>
>         Context context = ZMQ.context(1);
>         ZMQ.Socket sender = context.socket(ZMQ.PUSH);
>         System.out.println("sending events to " + targetIp + ":" +
> targetPort);
>         sender.connect("tcp://" + S.targetIp + ":" + S.targetPort);
>         sender.setSndHWM(2000);
>         // sender.setSendTimeOut(50);   // -1 is infinite
>
>         int callerSequenceNum = 1;
>         int unsent = 0;
>         while (callerSequenceNum <= 1000000) {
>             String m = "message " + callerSequenceNum;
>             boolean sent = sender.send(m.getBytes(), 0);
>             if (sent) {
>                 if (unsent > 0) {
>                     System.err.println("could not send message " +
> callerSequenceNum);
>                     unsent = 0;
>                 }
>                 System.out.print(callerSequenceNum + " ");
>                 if ((callerSequenceNum % 25) == 0)
> System.out.println("\n");
>                 Thread.sleep(10);
>             } else {
>                 unsent++;
>                 System.err.println("failed to send " + callerSequenceNum +
> ", missed " + unsent + ", sleeping 10 secs");
>                 Thread.sleep(10000);
>             }
>             callerSequenceNum++;
>         }
>         sender.close();
>         context.term();
>     }
> }
>
> *--- Receiver ---*
>
> package com.peaxy.zmqtest;
>
> import org.zeromq.ZMQ;
>
> public class R {
>     private static final int CPU_CORES = 8;
>
>     public static void main (String[] args) throws InterruptedException
>     {
>         ZMQ.Context context = ZMQ.context(CPU_CORES-1);
>         ZMQ.Socket receiver = context.socket(ZMQ.PULL);
>         receiver.bind("tcp://*:5558");
>         receiver.setRcvHWM(2000);
>         // receiver.setReceiveTimeOut(500);
>         long lastMessage = 0;
>
>         while (lastMessage <= 1000000) {
>             byte[] bytes = receiver.recv(0); // ZMQ.DONTWAIT);
>             if (bytes == null || bytes.length == 0) {
>                 System.out.println("timed out after " +
> receiver.getReceiveTimeOut());
>                 Thread.sleep(10);
>                 continue;
>             }
>             // received a message..
>             String message = new String(bytes);
>             int msgNumber =
> Integer.parseInt(message.substring(message.indexOf(' ')+1));
>             if (msgNumber != lastMessage + 1) {
>                 System.err.println("***** missed " +
> (msgNumber-lastMessage) + " messages");
>             } else {
>                 System.out.println(message);
>             }
>             lastMessage = msgNumber;
>         }
>         receiver.close();
>         context.term();
>     }
> }
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to