Hello Min,
Ay help please about our following issue, I've attached our testing source
code.
We're in the getting decision phase for Messaging platform and your
assistance highly appreciated.
We've found from the documentation that unlimited HWM is zero and not -1.
Thank you a lot for update.
Please find attached out test case code excerpt attached.
Your insight would be highly appreciated.
Thanks,
Erwin
AT&T, Senior Software Architect
public class PublisherMeasured extends MeasuredMasterPeer {
static String PUB_PORT = ":5559";
static long PUB_HWM = 1000000;
void initSocket() {
socket = context.socket(ZMQ.PUB);
socket.setLinger(-1);
socket.setHWM(PUB_HWM);
socketUrl = "tcp://*"+ PUB_PORT;
socket.bind(socketUrl);
sysoutForseMsg("Started publisher on "+ socketUrl + ",
syncsytart=" + syncstart);
}
void sendData(byte [] data2send, int chunk, long msgCtr) {
MsgProtocol msgProtocol = new MsgProtocol(socket, data2send,
msgCtr, verbose);
msgProtocol.sendDataMsg(chunk);
}
}
public class SubscriberMeasured extends MeasuredSlavePeer {
void initSocket() {
socket = context.socket(ZMQ.SUB);
socketUrl = "tcp://localhost" + peerPort;
socket.connect(socketUrl);
socket.setRcvHWM(PublisherMeasured.PUB_HWM);
socket.subscribe(MsgProtocol.DATA_MSG_TAG.getBytes());
socket.subscribe(MsgProtocol.END_MSG_TAG.getBytes());
BenchmarkUtils.log("Started subscriber on "+socketUrl);
}
void recieveData() {
MsgProtocol msgProtocol = new MsgProtocol(socket, verbose);
if (MsgProtocol.EOW==msgProtocol.recieveDataMsg()) {
sysoutMsg("Exit, total data messages num="+recvMsgNum);
recvEOW = true;
}
else {
incAccumLatency(msgProtocol.getDataLatency());
recvMsgNum++;
assertAllMsgsArrived(msgProtocol.getSendMsgOrderNum());
}
}
}
public class MsgProtocol {
// The code supports sending data in chuncks, but the benchmark was done for
chunck=0 (send all);
// Verbose flag is set via CLI and is for debugging it was false while
benchmarking
public void sendDataMsg(int chunck) {
envelope.sendtime = System.nanoTime();
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"sendDataMsg",
BenchmarkUtils.byteDataSumry2String("Sending
sendtime=" + envelope.sendtime + ",msgCtr=" + envelope.msgCtr + ", data=",
envelope.content));
}
// Write a messages with an envelope and content
socket.send(envelope.header.getBytes(), ZMQ.SNDMORE);
socket.send(Long.toString(envelope.sendtime).getBytes(),
ZMQ.SNDMORE);
socket.send(Long.toString(envelope.msgCtr).getBytes(),
ZMQ.SNDMORE);
if (0==chunck) {
// send the whole message
if ( true == socket.send(envelope.content, 0) ) {
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"sendDataMsg", "all send in one message");
}
}
else {
BenchmarkUtils.trace(getClass().getName(),
"sendDataMsg","Failed to send data");
}
}
else {
// Send multi-part message
for (int curPos=0; curPos < envelope.content.length;) {
curPos = sendSubmsg(curPos, envelope.content,
chunck);
}
}
}
public int recieveDataMsg() {
// Read envelope with address
envelope.header = new String(socket.recv(0));
envelope.recieveTime = System.nanoTime();
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"recieveDataMsg", "Received "+ envelope.header);
}
if (isShutdownMsg(envelope.header)) {
BenchmarkUtils.trace(getClass().getName(),
"recieveDataMsg", "Shutdown message arrived");
return EOW;
}
else {
// read sent time
envelope.sendtime=Long.parseLong(new
String(socket.recv(0)));
envelope.msgCtr=Long.parseLong(new String(socket.recv(0)));
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"recieveDataMsg",
"Send time="+envelope.sendtime + ",
recieve time=" + envelope.recieveTime + ",msgCtr=" + envelope.msgCtr);
}
recieveLongData();
}
return 0;
}
private void recieveLongData() {
for (boolean more = socket.hasReceiveMore(); more; more =
socket.hasReceiveMore()) {
// Read message contents
envelope.content = socket.recv(0);
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"recieveLongData",
BenchmarkUtils.byteDataSumry2String("recieved ", envelope.content));
}
}
}
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev