Hello Min,
Thank you a lot for update.
Please find attached out test case code excerpt attached.
Your insight would be highly appreciated.
Thanks,
Erwin
AT&T, Senior Software Architect
public class PublisherMeasured extends MeasuredMasterPeer {
static String PUB_PORT = ":5559";
static long PUB_HWM = 1000000;
void initSocket() {
socket = context.socket(ZMQ.PUB);
socket.setLinger(-1);
socket.setHWM(PUB_HWM);
socketUrl = "tcp://*"+ PUB_PORT;
socket.bind(socketUrl);
sysoutForseMsg("Started publisher on "+ socketUrl + ",
syncsytart=" + syncstart);
}
void sendData(byte [] data2send, int chunk, long msgCtr) {
MsgProtocol msgProtocol = new MsgProtocol(socket, data2send,
msgCtr, verbose);
msgProtocol.sendDataMsg(chunk);
}
}
public class SubscriberMeasured extends MeasuredSlavePeer {
void initSocket() {
socket = context.socket(ZMQ.SUB);
socketUrl = "tcp://localhost" + peerPort;
socket.connect(socketUrl);
socket.setRcvHWM(PublisherMeasured.PUB_HWM);
socket.subscribe(MsgProtocol.DATA_MSG_TAG.getBytes());
socket.subscribe(MsgProtocol.END_MSG_TAG.getBytes());
BenchmarkUtils.log("Started subscriber on "+socketUrl);
}
void recieveData() {
MsgProtocol msgProtocol = new MsgProtocol(socket, verbose);
if (MsgProtocol.EOW==msgProtocol.recieveDataMsg()) {
sysoutMsg("Exit, total data messages num="+recvMsgNum);
recvEOW = true;
}
else {
incAccumLatency(msgProtocol.getDataLatency());
recvMsgNum++;
assertAllMsgsArrived(msgProtocol.getSendMsgOrderNum());
}
}
}
public class MsgProtocol {
// The code supports sending data in chuncks, but the benchmark was done for
chunck=0 (send all);
// Verbose flag is set via CLI and is for debugging it was false while
benchmarking
public void sendDataMsg(int chunck) {
envelope.sendtime = System.nanoTime();
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"sendDataMsg",
BenchmarkUtils.byteDataSumry2String("Sending
sendtime=" + envelope.sendtime + ",msgCtr=" + envelope.msgCtr + ", data=",
envelope.content));
}
// Write a messages with an envelope and content
socket.send(envelope.header.getBytes(), ZMQ.SNDMORE);
socket.send(Long.toString(envelope.sendtime).getBytes(),
ZMQ.SNDMORE);
socket.send(Long.toString(envelope.msgCtr).getBytes(),
ZMQ.SNDMORE);
if (0==chunck) {
// send the whole message
if ( true == socket.send(envelope.content, 0) ) {
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"sendDataMsg", "all send in one message");
}
}
else {
BenchmarkUtils.trace(getClass().getName(),
"sendDataMsg","Failed to send data");
}
}
else {
// Send multi-part message
for (int curPos=0; curPos < envelope.content.length;) {
curPos = sendSubmsg(curPos, envelope.content,
chunck);
}
}
}
public int recieveDataMsg() {
// Read envelope with address
envelope.header = new String(socket.recv(0));
envelope.recieveTime = System.nanoTime();
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"recieveDataMsg", "Received "+ envelope.header);
}
if (isShutdownMsg(envelope.header)) {
BenchmarkUtils.trace(getClass().getName(),
"recieveDataMsg", "Shutdown message arrived");
return EOW;
}
else {
// read sent time
envelope.sendtime=Long.parseLong(new
String(socket.recv(0)));
envelope.msgCtr=Long.parseLong(new String(socket.recv(0)));
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"recieveDataMsg",
"Send time="+envelope.sendtime + ",
recieve time=" + envelope.recieveTime + ",msgCtr=" + envelope.msgCtr);
}
recieveLongData();
}
return 0;
}
private void recieveLongData() {
for (boolean more = socket.hasReceiveMore(); more; more =
socket.hasReceiveMore()) {
// Read message contents
envelope.content = socket.recv(0);
if (verbose) {
BenchmarkUtils.trace(getClass().getName(),
"recieveLongData",
BenchmarkUtils.byteDataSumry2String("recieved ", envelope.content));
}
}
}
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev