Hi list,
In our code we use a simple PUB/SUB scheme: A publisher is sending data
over a PUB socket and a subscriber is getting that data using a SUB socket.
Today I tested the code and (especially with a lot of messages) after
some time the subscriber hangs in socket.recv(). The publisher happily
goes on at sending messages and does not return an error. The subscriber
does not get any of those messages though and also does not report an error.
I simplified the code we are using and a attached it to this email.
Sometimes the problem appears after 20 seconds and sometimes after 400
but eventually it happens.
I am using ZeroMQ 2.0.10 and the Java bindings on Mac OS X 10.6.4 with .
Best,
Oliver
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public final class Client {
private static long numMessages;
public static void main(final String[] args) {
if (args.length != 1) {
System.out.println("Give address to subscribe to.");
System.exit(0);
}
// start statistics thread
statisticsThread.start();
// get messages
Context context = ZMQ.context(1);
Socket subSocket = context.socket(ZMQ.SUB);
subSocket.connect(args[0]);
subSocket.subscribe("".getBytes());
while (true) {
byte[] rawMsg = subSocket.recv(0);
if (rawMsg == null) {
System.err.println("Received message was null.");
}
numMessages++;
}
}
private static Thread statisticsThread = new Thread(new Runnable() {
private long startTime = System.currentTimeMillis();
private long previousNumMessages;
@Override
public void run() {
System.out.println("Num. Msgs received");
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (previousNumMessages == numMessages) {
System.out.println("No new messages arrived! Time: "
+ (System.currentTimeMillis() - startTime));
} else {
System.out.println(numMessages);
previousNumMessages = numMessages;
}
}
}
});
}
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public final class Publisher extends Thread {
private static final int MESSAGES_PER_SECOND = 100000;
private final int halfMessagesPerSecond;
private final String address;
private long sleepOverhead;
private int toSlow;
private Socket pubSocket;
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("Specify address to publish to.");
System.exit(0);
}
Publisher publisher = new Publisher(MESSAGES_PER_SECOND, args[0]);
publisher.start();
}
public Publisher(int messagesPerSecond, String address) {
this.address = address;
this.halfMessagesPerSecond = (int) (messagesPerSecond / 2.0f);
}
@Override
public final void run() {
Context context = ZMQ.context(1);
pubSocket = context.socket(ZMQ.PUB);
pubSocket.bind(address);
int msgsSent = 0;
long startTime = System.currentTimeMillis();
while (true) {
// process
boolean success = pubSocket.send("This is the message".getBytes(), 0);
if (!success) {
System.err.println("Error while sending message.");
}
msgsSent++;
// wait
if (msgsSent == halfMessagesPerSecond) {
long time = System.currentTimeMillis() - startTime;
correctRate(time);
msgsSent = 0;
startTime = System.currentTimeMillis();
}
}
}
/**
* This method is responsible for sleeping for an appropriate time to ensure
the target msgs/s
* rate is achieved. The method is called every time half the messages were
sent that are
* supposed to be sent per second.
*/
private void correctRate(long time) {
long sleepTime = 500 - time - sleepOverhead;
if (sleepTime > 0) {
long startSleep = System.currentTimeMillis();
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
System.err.println("Thread was interrupted while sleeping: " +
e.toString());
}
sleepOverhead = (System.currentTimeMillis() - startSleep) - sleepTime;
sleepOverhead = sleepOverhead < 0 ? 0 : sleepOverhead;
}
}
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev