On 07/27/2013 11:22 AM, Riskybiz wrote: > It is possible that a different thread could be trying to add to the > vector while the response is being built but I had hoped that using the > concurrent vector would eliminate the problem?
No. Your code thinks it's in the last iteration and sends without SNDMORE - but then the vector grows and it tries to add another frame afterwards and fails. I don't know the concurrency library you are using, but maybe reading the end() iterator just once would help. Christian The server loop is > running on a different thread to the rest of the application. The ‘for’ > loop building the message uses an iterator to detect the ‘end’ of the > DDV vector so that the just as much data as is presently available is > sent. The vector only grows with push_back(), it’s never erased or > changed; the only operations on the vector are push_back() and iterator > dereferencing to fetch the underlying string. Also the program > functionality is all normal when server starts first, so am not sure > that the problem lies in composing the message. > > > > I forgot to mention in my post that the REP socket becomes unresponsive > to further requests after the ‘Operation cannot be accomplished in > current state’ error it can be seen how the client receives all of the > message up to part 4462. The client then requests element 4463 from the > vector but never gets a response. > > > > Riskybiz. > > > > > > _Further sample output:_ > > _ _ > > [5776] SD_Client: Message: 4458 4422 41481.266667 41481.291667 98.84000 > 98.61900 > > [5776] SD_Client: Message: 4459 4401 41481.260417 41481.291667 98.84000 > 98.61900 > > [5776] SD_Client: Message: 4460 4484 41481.286111 41481.292361 98.84000 > 98.71100 > > [5776] SD_Client: Message: 4461 4487 41481.286806 41481.293056 98.84000 > 98.70200 > > [5776] SD_Client: Message: 4462 4466 41481.280556 41481.293056 98.84000 > 98.70200 > > [5776] SD_Client: Sending Data Request For Element No:4463 > > [5776] SD_Client: Waiting For Reply.... > > [5776] SD_Client: Waiting For Reply.... > > [5776] SD_Client: Waiting For Reply.... > > [5776] SD_Client: Waiting For Reply.... > > > > > > *From:*Christian Kamm [mailto:[email protected]] > *Sent:* 27 July 2013 06:31 > *To:* Riskybiz > *Cc:* [email protected] <mailto:[email protected]> > *Subject:* Re: [zeromq-dev] ZeroMQ Error when client starts first, > server second. > > > >> [5100] TestDataAccess: ZMQServer: SendMore: 4461 4487 41481.286806 > 41481.293056 98.84000 98.70200 >> [5100] TestDataAccess: ZMQServer: FinalSend: 4462 4466 41481.280556 > 41481.293056 98.84000 98.70200 >> [5100] TestDataAccess: ZMQServer: Error In SendMore: Operation cannot > be accomplished in current state >> [5776] SD_Client: Message: 0 1 41479.919444 41479.926389 100.29400 > > That looks to me like you are trying to send a message part after the > final part has been sent. That'd be consistent with the client actually > getting the finished message and the error message you're getting. > > I didn't see any synchronization in your code. Maybe another thread > grows the DDV vector while the repose is being built? > > Christian > > Riskybiz <[email protected] <mailto:[email protected]>> wrote: > > I have a REQ & REP server and client configured so that the client sends > a request for data beginning from a certain vector index. The server > then sends messages (multipart if necessary) where each message part > represents the contents of one vector element. On the first request it > is common for a multipart message of several thousand parts to be sent > by REP. Subsequent requests will result in either a ‘nothing further’ > message, a single message or a multipart message of just a few parts. > The bulk of the data is sent on the first request. > > > > I am experiencing ZMQ behaviour I cannot explain. Please note this is > my first ZMQ project! > > > > When the server is launched first with the client launched later; then > all is well. The large multipart message goes through and is received > properly, subsequent smaller transmissions can then be seen as expected. > I am careful that only one request is sent from a client at a time, the > client will wait for a reply so that the lock step REQ – REP routine is > maintained. > > > > HOWEVER if the CLIENT is launched FIRST and the SERVER a few (maybe 10) > seconds LATER then an error becomes apparent. > > > > The first request from REQ results in a large multipart message being > sent from the server REP. The sample output below shows the ‘FinalSend’ > from the server when part 4462 of a large multipart message is sent. > Then the error ‘Operation cannot be accomplished in current state’ is > shown. This error causes an inelegant exit by my application. > > > > After the server error the client can be seen processing the large > multipart message as it is received beginning with part 0, 1, 2, 3 etc…. > > > > Now; I am not sure how to correct this problem. Looking at the patterns > of behaviour for REQ & REP sockets does not give a direct answer other > than a suspicion that a high-water-mark issue could be present? Or > perhaps there is some socket option that needs resetting or has got stuck? > > > > Any tips or help much appreciated. It’s really important to my > application that the server & client can launch , bind & connect in any > order. > > > > With thanks, > > > > Riskybiz. > > > > _Sample Output:_ > > > > [5100] TestDataAccess: ZMQServer: SendMore: 4453 4457 41481.277778 > 41481.290278 98.84000 98.75000 > > [5100] TestDataAccess: ZMQServer: SendMore: 4454 4479 41481.284722 > 41481.290972 98.84000 98.76900 > > [5100] TestDataAccess: ZMQServer: SendMore: 4455 4482 41481.285417 > 41481.291667 98.84000 98.74400 > > [5100] TestDataAccess: ZMQServer: SendMore: 4456 4462 41481.279167 > 41481.291667 98.84000 98.74400 > > [5100] TestDataAccess: ZMQServer: SendMore: 4457 4442 41481.272917 > 41481.291667 98.84000 98.66400 > > [5100] TestDataAccess: ZMQServer: SendMore: 4458 4422 41481.266667 > 41481.291667 98.84000 98.61900 > > [5100] TestDataAccess: ZMQServer: SendMore: 4459 4401 41481.260417 > 41481.291667 98.84000 98.61900 > > [5100] TestDataAccess: ZMQServer: SendMore: 4460 4484 41481.286111 > 41481.292361 98.84000 98.71100 > > [5100] TestDataAccess: ZMQServer: SendMore: 4461 4487 41481.286806 > 41481.293056 98.84000 98.70200 > > [5100] TestDataAccess: ZMQServer: FinalSend: 4462 4466 41481.280556 > 41481.293056 98.84000 98.70200 > > [5100] TestDataAccess: ZMQServer: Error In SendMore: Operation cannot be > accomplished in current state > > [5776] SD_Client: Message: 0 1 41479.919444 41479.926389 100.29400 100.23900 > > [5776] SD_Client: Message: 1 4 41479.920139 41479.927083 100.29400 100.21900 > > [5776] SD_Client: Message: 2 6 41479.920833 41479.927778 100.29400 100.18000 > > [5776] SD_Client: Message: 3 9 41479.921528 41479.928472 100.29400 100.18000 > > [5776] SD_Client: Message: 4 10 41479.922222 41479.929167 100.29400 > 100.18000 > > [5776] SD_Client: Message: 5 13 41479.922917 41479.929861 100.29400 > 100.18000 > > [5776] SD_Client: Message: 6 15 41479.923611 41479.930556 100.29400 > 100.18000 > > [5776] SD_Client: Message: 7 18 41479.924306 41479.931250 100.28700 > 100.18000 > > [5776] SD_Client: Message: 8 22 41479.925694 41479.931944 100.28700 > 100.18000 > > [5776] SD_Client: Message: 9 2 41479.919444 41479.931944 100.29400 100.18000 > > [5776] SD_Client: Message: 10 23 41479.926389 41479.932639 100.24900 > 100.18000 > > > > > > _Server Code:_ > > > > int DataServer() > > { > > //Prepare the context > > void *context = zmq_ctx_new(); > > if(context == NULL) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Creating > Context: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > > > > > //Set > > bool runLoop(true); > > > > //Prepare the socket > > void *server = zmq_socket(context, ZMQ_REP); > > if(server == NULL) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Creating REP > socket: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > > > // Configure socket to not wait at close time > > int linger = 0; > > int rc = zmq_setsockopt(server, ZMQ_LINGER, &linger, sizeof (linger)); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Setting Socket Options > Failed: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1; > > } > > > > rc = zmq_bind(server, "tcp://*:5555"); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Bind To REP Failed: " > + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1; > > } > > > > > > OutputDebugStringA("TestDataAccess: ZMQServer: Data Server Ready.... "); > > //Listen for request > > while(runLoop) > > { > > OutputDebugStringA("TestDataAccess: ZMQServer: Waiting For A Request...."); > > //Wait for a request > > zmq_msg_t request; > > int rc = zmq_msg_init(&request); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Initialising > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > } > > > > rc = zmq_msg_recv(&request, server, 0); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Receiving > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > > > //Release Message > > rc = zmq_msg_close(&request); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Releasing > Received Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > return 1;//error > > } > > OutputDebugStringA("TestDataAccess: ZMQServer: Processing Request...."); > > > > //Cast the request data to a string > > std::string reqStr(static_cast<char*>(zmq_msg_data(&request)), > zmq_msg_size(&request)); > > > > unsigned int startFromElement; > > > > if(reqStr == "kill") > > { > > //Send reply > > std::string outMsg = "OK killing"; > > zmq_msg_t messageOut; > > > > int rc = zmq_msg_init_size(&messageOut, outMsg.size()); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Initialising > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > } > > > > memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size()); > > > > rc = zmq_msg_send(&messageOut, server, 0); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Sending Shutdown > Acknowledgement: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > //Release Message > > rc = zmq_msg_close(&messageOut); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > } > > > > //Release Message > > rc = zmq_msg_close(&request); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Releasing > Received Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > OutputDebugStringA("TestDataAccess: ZMQServer: Shutting Down Server...."); > > runLoop = false; > > break; > > } > > else > > { > > //Convert the request string to an integer > > if ( ! (std::istringstream(reqStr) >> startFromElement) ) > startFromElement = 0;//checks that the string actually contains numeric > characters; returns 0 if not; > > } > > > > > > //Prepare the reply > > if (startFromElement >= DDV->size() || DDV->size() == 0) > > { > > //Client already has all the data > > std::string outMsg = reqStr + " Nothing Further"; > > zmq_msg_t messageOut; > > > > int rc = zmq_msg_init_size(&messageOut, outMsg.size()); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Initialising > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > } > > > > memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size()); > > rc = zmq_msg_send(&messageOut, server, 0); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Sending 'Nothing > Further': " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > else if(rc >= 0) > > { > > std::string outStr = "TestDataAccess: ZMQServer: Message: " + reqStr + " > Nothing Further"; > > const char* msgOut = outStr.c_str(); > > OutputDebugStringA(msgOut); > > } > > > > //Release Message > > rc = zmq_msg_close(&messageOut); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > > > } > > else > > { > > > > //There is data to send > > for(auto iter = DDV->begin() + startFromElement; iter != DDV->end(); iter++) > > { > > if(iter + 1 != DDV->end()) > > { > > std::string elemStr = static_cast<std::ostringstream*>( > &(std::ostringstream() << distance(DDV->begin(),iter)))->str(); > > std::string outMsg = elemStr + " " + *iter; > > > > //SendMore > > zmq_msg_t messageOut; > > > > int rc = zmq_msg_init_size(&messageOut, outMsg.size()); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Initialising > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > } > > > > memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size()); > > > > rc = zmq_msg_send(&messageOut, server, ZMQ_SNDMORE); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error In SendMore: " + > errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > > > //Release Message > > rc = zmq_msg_close(&messageOut); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > } > > else if(rc >= 0) > > { > > std::string outStr = "TestDataAccess: ZMQServer: SendMore: " + elemStr + > " " + *iter; > > const char* msgOut = outStr.c_str(); > > OutputDebugStringA(msgOut); > > } > > > > //Release Message > > rc = zmq_msg_close(&messageOut); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > > > > > } > > else if(iter + 1 == DDV->end()) > > { > > std::string elemStr = static_cast<std::ostringstream*>( > &(std::ostringstream() << distance(DDV->begin(),iter)))->str(); > > std::string outMsg = elemStr + " " + *iter; > > //Final Send > > zmq_msg_t messageOut; > > > > int rc = zmq_msg_init_size(&messageOut, outMsg.size()); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Initialising > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > } > > > > memcpy (zmq_msg_data(&messageOut), outMsg.data(), outMsg.size()); > > > > rc = zmq_msg_send(&messageOut, server, 0); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error In FinalSend: " > + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > > > //Release Message > > rc = zmq_msg_close(&messageOut); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > return 1;//error > > } > > else if(rc >= 0) > > { > > std::string outStr = "TestDataAccess: ZMQServer: FinalSend: " + elemStr > + " " + *iter; > > const char* msgOut = outStr.c_str(); > > OutputDebugStringA(msgOut); > > } > > > > //Release Message > > rc = zmq_msg_close(&messageOut); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Releasing Sent > Message: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > > > } > > }//for > > }//else > > }//while > > > > > > //Close Server Socket > > rc = zmq_close(server); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Closing Server > Socket: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > > > //When you exit the program, close your sockets and then call > zmq_ctx_destroy(). This destroys the context. > > rc = zmq_ctx_destroy(context); > > if(rc == -1) > > { > > std::string errStr = zmq_strerror(zmq_errno()); > > std::string errConc = "TestDataAccess: ZMQServer: Error Destroying > Context: " + errStr; > > const char* errOut = errConc.c_str(); > > OutputDebugStringA(errOut); > > return 1;//error > > } > > > > > > > > OutputDebugStringA("TestDataAccess: ZMQServer: Server Shut Down."); > > return 0; > > } > > > > > > > > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
