Hello Min,

Thank you a lot for update.

Please find attached out test case code excerpt attached.

Your insight would be highly appreciated.

Thanks,
Erwin
AT&T, Senior Software Architect
public class PublisherMeasured extends MeasuredMasterPeer {
       static String PUB_PORT = ":5559";
       static long PUB_HWM = 1000000;

       …

       void initSocket() {
              socket = context.socket(ZMQ.PUB); 
              socket.setLinger(-1);
              socket.setHWM(PUB_HWM);
              socketUrl = "tcp://*"+ PUB_PORT;
              socket.bind(socketUrl);    
              sysoutForseMsg("Started publisher on "+ socketUrl + ", 
syncsytart=" + syncstart);
       }

       void sendData(byte [] data2send, int chunk, long msgCtr) {
              MsgProtocol msgProtocol = new MsgProtocol(socket, data2send, 
msgCtr, verbose);
              msgProtocol.sendDataMsg(chunk);
       }
…
}

public class SubscriberMeasured extends MeasuredSlavePeer {
…
       void initSocket() {
              socket = context.socket(ZMQ.SUB);  
              socketUrl = "tcp://localhost" + peerPort;
              socket.connect(socketUrl);
              socket.setRcvHWM(PublisherMeasured.PUB_HWM);
              socket.subscribe(MsgProtocol.DATA_MSG_TAG.getBytes());  
              socket.subscribe(MsgProtocol.END_MSG_TAG.getBytes());
              BenchmarkUtils.log("Started subscriber on "+socketUrl);
       }

       void recieveData() {
              MsgProtocol msgProtocol = new MsgProtocol(socket, verbose);
              if (MsgProtocol.EOW==msgProtocol.recieveDataMsg()) {
                     sysoutMsg("Exit, total data messages num="+recvMsgNum);
                     recvEOW = true;
              }
              else {
                     incAccumLatency(msgProtocol.getDataLatency());
                     recvMsgNum++;
                     assertAllMsgsArrived(msgProtocol.getSendMsgOrderNum());
              }
       }
…
}
       
public class MsgProtocol {
…
// The code supports sending data in chuncks, but the benchmark was done for 
chunck=0 (send all);
// Verbose flag is set via CLI and is for debugging – it was false while 
benchmarking 
       public void sendDataMsg(int chunck) {
              envelope.sendtime = System.nanoTime();
              if (verbose)  {
                     BenchmarkUtils.trace(getClass().getName(),
                                  "sendDataMsg", 
                                  BenchmarkUtils.byteDataSumry2String("Sending 
sendtime=" + envelope.sendtime + ",msgCtr=" + envelope.msgCtr + ", data=", 
envelope.content));
              }
              // Write a messages with an envelope and content      
              socket.send(envelope.header.getBytes(), ZMQ.SNDMORE);
              socket.send(Long.toString(envelope.sendtime).getBytes(), 
ZMQ.SNDMORE);
              socket.send(Long.toString(envelope.msgCtr).getBytes(), 
ZMQ.SNDMORE);
              if (0==chunck) {
                     // send the whole message
                     if ( true == socket.send(envelope.content, 0) ) {
                           if (verbose) {
                                  BenchmarkUtils.trace(getClass().getName(), 
"sendDataMsg", "all send in one message");
                           }
                     }
                     else {
                           BenchmarkUtils.trace(getClass().getName(), 
"sendDataMsg","Failed to send data");
                     }
              }
              else {
                     // Send multi-part message
                     for (int curPos=0; curPos < envelope.content.length;) {
                           curPos = sendSubmsg(curPos, envelope.content, 
chunck);
                     }
              }
       }
       
       public int recieveDataMsg() {            
              // Read envelope with address      
              envelope.header = new String(socket.recv(0));
              envelope.recieveTime = System.nanoTime();
              if (verbose) {
                     BenchmarkUtils.trace(getClass().getName(), 
"recieveDataMsg", "Received "+ envelope.header); 
              }
              if (isShutdownMsg(envelope.header)) {
                     BenchmarkUtils.trace(getClass().getName(), 
"recieveDataMsg", "Shutdown message arrived");   
                     return EOW;
              }
              else {
                     // read sent time
                     envelope.sendtime=Long.parseLong(new 
String(socket.recv(0)));
                     envelope.msgCtr=Long.parseLong(new String(socket.recv(0)));
                     if (verbose) {
                           BenchmarkUtils.trace(getClass().getName(),
                                         "recieveDataMsg",                      
                
                                         "Send time="+envelope.sendtime + ", 
recieve time=" + envelope.recieveTime + ",msgCtr=" + envelope.msgCtr);   
                     }
                     recieveLongData();
              }
              return 0;
       }
       
       private void recieveLongData() {
              for (boolean more = socket.hasReceiveMore(); more; more = 
socket.hasReceiveMore()) {
                     // Read message contents      
                     envelope.content = socket.recv(0);      
                     if (verbose) {
                           BenchmarkUtils.trace(getClass().getName(),
                                         "recieveLongData", 
                                         
BenchmarkUtils.byteDataSumry2String("recieved ", envelope.content));            
   
                     }
              }
       }
…
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to