Set the HWM before connecting or binding.
On Jun 25, 2013 2:36 AM, "David Cheney" <[email protected]> wrote:
>
> I have s simple "Sender" S.java and "Receiver" R.java using jzmq. I start
> the receiver, then the sender. I stop the receiver after it has received
> about 350 messages; the sender reports queuing 1000 messages - so far so
> good. I restart and quickly stop the receiver. It reports receiving all
> but the first few queued messages, then the sender blocks once it has
> queued its second batch of 1000.
>
> My attempts to set the HWM seem to have no effect. The sender stops on
> units of 1000 regardless of what I set the HWM to on either end. Clearly
> there is something I'm missing. Thanks in advance!
>
> Platform is kvm under Centos 6.4 / Linux 2.6.32 64 bit, with Java at 1.7.0.
>
>
> *--- Sender ---*
>
> package com.peaxy.zmqtest;
>
> import org.zeromq.ZMQ;
> import org.zeromq.ZMQ.Context;
>
> public class S
> {
> static String targetIp = null;
> static int targetPort = 5558;
>
> public static void main (String[] args) throws InterruptedException
> {
> if (args.length > 0) {
> targetIp = args[0];
> } else {
> System.err.println("usage: S <ip.address>");
> System.exit(1);
> }
>
> Context context = ZMQ.context(1);
> ZMQ.Socket sender = context.socket(ZMQ.PUSH);
> System.out.println("sending events to " + targetIp + ":" +
> targetPort);
> sender.connect("tcp://" + S.targetIp + ":" + S.targetPort);
> sender.setSndHWM(2000);
> // sender.setSendTimeOut(50); // -1 is infinite
>
> int callerSequenceNum = 1;
> int unsent = 0;
> while (callerSequenceNum <= 1000000) {
> String m = "message " + callerSequenceNum;
> boolean sent = sender.send(m.getBytes(), 0);
> if (sent) {
> if (unsent > 0) {
> System.err.println("could not send message " +
> callerSequenceNum);
> unsent = 0;
> }
> System.out.print(callerSequenceNum + " ");
> if ((callerSequenceNum % 25) == 0)
> System.out.println("\n");
> Thread.sleep(10);
> } else {
> unsent++;
> System.err.println("failed to send " + callerSequenceNum +
> ", missed " + unsent + ", sleeping 10 secs");
> Thread.sleep(10000);
> }
> callerSequenceNum++;
> }
> sender.close();
> context.term();
> }
> }
>
> *--- Receiver ---*
>
> package com.peaxy.zmqtest;
>
> import org.zeromq.ZMQ;
>
> public class R {
> private static final int CPU_CORES = 8;
>
> public static void main (String[] args) throws InterruptedException
> {
> ZMQ.Context context = ZMQ.context(CPU_CORES-1);
> ZMQ.Socket receiver = context.socket(ZMQ.PULL);
> receiver.bind("tcp://*:5558");
> receiver.setRcvHWM(2000);
> // receiver.setReceiveTimeOut(500);
> long lastMessage = 0;
>
> while (lastMessage <= 1000000) {
> byte[] bytes = receiver.recv(0); // ZMQ.DONTWAIT);
> if (bytes == null || bytes.length == 0) {
> System.out.println("timed out after " +
> receiver.getReceiveTimeOut());
> Thread.sleep(10);
> continue;
> }
> // received a message..
> String message = new String(bytes);
> int msgNumber =
> Integer.parseInt(message.substring(message.indexOf(' ')+1));
> if (msgNumber != lastMessage + 1) {
> System.err.println("***** missed " +
> (msgNumber-lastMessage) + " messages");
> } else {
> System.out.println(message);
> }
> lastMessage = msgNumber;
> }
> receiver.close();
> context.term();
> }
> }
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev