Replied inline. On Sat, Mar 7, 2015 at 8:33 AM, <[email protected]> wrote:
> Oops ... I overlooked, that there's no time between connect and send in > client ;o) > > please excuse this silly thing. > > > > Result: I still don't have an easy test case for my vanished messages > ;o(((((( > > > > Am 2015-03-07 13:41, schrieb Sven Koebnick: > > Thanks for the fast response ;-) > Did you notice that all workers get the same first message? The bug should > be in the router or the client (main) I assume. > > Is there some way to ask the socket if the connection has been > established? I didnt find one. > > No, the canonical way to ensure that messages go through is to make sure that the server, or whichever side does bind, is already running and bound before making a connect call. Then when you make the connect call it will go through. The other way to ensure reliability is to have a application layer message loss failback strategy. So, when you, on the client side, do connect and then send, wait for a response in a loop. Use exponential backoff and retry to send, and continual message loss means that the listener is not available. > > Indeed in my larger system, I have the combination you suggested of Pub as > Rep in router, Sub and Req in workers. > It is asured, that the connection is there because sometimes the second or > third reply from Router to workers is lost, but not before ( I think) the > first Pub message is send to worker. But this exact situation I did not > manage to show in a minimal testcase. > > Ok, well my replies are going off the source that you have provided so far. I might be better off modifying that if you didn't understand, but I don't have anything to go by when you mention your other structures, so I can't do much there. > > Are there any "dont do's" or important inits, that I missed? > In ZMQ2 all things worked well. It started with switching to ZMQ4. > > If you want, I could draw some small picture of the structure I use. > > Please do this. > > What I noticed in your API is, that some funcs take a message** type for > NULLing the data pointer as sign "this data belongs to zmq now" but some > (e.g. Send(socket*, byte*, size) for sending data directly without wrapping > in a zmsg. > But I allready tried leaking ALL data for not free()ing data that may > linger in zmq, but only to little improvement. > Also I checked with tdmalloc, valgrind, efence and duma, but they all say > that I'm fine with memory access. (or does zmq somehow work arround those > memory libs?) > > Reference the zmq api for the specific library you are using; more than likely, there is a convention that is strictly obeyed by the API regarding resource ownership. Otherwise, I would advise you to abandon delete and free calls in favor of RAII and language facilities like std::unique_ptr. > > mit freundlichen Grüßen > Sven Koebnick > -------------------------------------------------------- > E = mc² ± 2dBA ----- everything is relative > --------------------------------------------------------- > > Am 06.03.2015 um 17:29 schrieb Kenneth Adam Miller < > [email protected]>: > > I notice that in your public code, you wait until you get a request on > the sub socket, and then publish on the workers socket. The workers socket > is a pub socket, and just receiving a request on the sub socket doesn't > mean that all workers have connected. Why not, before you start publishing > at all, have an additional rep socket and wait in a loop to receive and > respond to exactly as many workers as you intend to can off because you > start listening on the sub socket at all. Then what you could do is have > the workers send a req message to that rep socket, and wait for a reply > before sending something to the sub socket. I'm not sure what the sub > socket is supposed to do, I suppose you identified the need for it since > it's a many to one relationship-you have many workers, one publisher. But > you can replace the sub socket's role in waiting for worker message with a > rep socket. In the worker, before sending to the rep socket, make a connect > call to the publisher. > > On Fri, Mar 6, 2015 at 10:58 AM, <[email protected]> wrote: > >> Hi all! >> >> >> after migration from ZMQ2 to ZMQ4 I had problems with vanished messages >> (on REP/REQ as well as on PUB/SUB). >> >> I now *managed to build some DIRTY "minimal" testcase* (see below) >> displaying at least >one< of my problems. >> >> The testcase does following: >> >> - create a router binding to one SUB socket (for clients) and one PUB >> socket (for workers) >> >> - create some workers listening on a SUB socket connected to the routers >> PUB port >> >> - WAIT some seconds !!! >> >> - send 10000 numbers on a PUB socket connected to routers SUB port in >> full speed >> >> - workers expect numbers 0 to n to be send as massages in correct order >> and complain on missfitting numbers >> >> >> >> The result shows, that *about 1400 to 3000 of 10000 messages get lost* >> until the rest reaches the workers. >> >> WHAT AM I DOING WRONG ? >> >> >> It seems to get a *little* better if I >> >> - increase the threads in context with zmq_ctx_set() >> >> - increase the number of workers >> >> - increase the SND and RCV highwatermarks using set_sockopt() >> >> but it never get really good. >> >> >> >> When I send 100.000 message, the first 3.500 get lost (?!?). >> >> When I send 1000 messages, none arrives ;o( >> >> When I set the SNDHMW and RCVHWM to 0 (zero) which should be "unlimited", >> nothing ever arrives. >> >> many thanks >> >> sven >> >> =====SNIP===output================================== >> >> starting Router ... >> thread id -1212605632 >> starting 10 Workers ... >> finished starting Workers ... >> sending messages ... >> sending messages done >> worker 2 expected message 0, got message 4616 >> worker 5 expected message 0, got message 4616 >> worker 9 expected message 0, got message 4616 >> worker 0 expected message 0, got message 4616 >> worker 7 expected message 0, got message 4616 >> worker 4 expected message 0, got message 4616 >> worker 1 expected message 0, got message 4616 >> worker 8 expected message 0, got message 4616 >> worker 6 expected message 0, got message 4616 >> worker 3 expected message 0, got message 4616 >> worker 9 ending after 5383 good messages. last message was 9999 >> worker 2 ending after 5383 good messages. last message was 9999 >> worker 7 ending after 5383 good messages. last message was 9999 >> worker 5 ending after 5383 good messages. last message was 9999 >> worker 8 ending after 5383 good messages. last message was 9999 >> worker 4 ending after 5383 good messages. last message was 9999 >> worker 1 ending after 5383 good messages. last message was 9999 >> worker 3 ending after 5383 good messages. last message was 9999 >> worker 6 ending after 5383 good messages. last message was 9999 >> worker 0 ending after 5383 good messages. last message was 9999 >> all own thread have terminated >> router has terminated >> >> =====SNIP===code================================== >> >> #include <stdlib.h> >> #include <stdio.h> >> #include <pthread.h> >> #include <unistd.h> >> #include <zmq.h> >> #include <czmq.h> >> #include <zframe.h> >> #include <string> >> >> int numWorkers=10; >> >> void sethwm(int o,void*s) { >> int opt; >> size_t len=4; >> opt=500000; >> int rc=zmq_setsockopt(s,o,&opt,len); >> if (rc!=0) >> printf("error setting HWM\n"); >> opt=-1; >> rc=zmq_getsockopt(s,o,&opt,&len); >> if (rc!=0) >> printf("error getting HWM\n"); >> } >> >> class Router { >> public: volatile bool leave=false; >> private: >> void inline goRouter() { >> void *clients=zmq_socket(ctx, ZMQ_SUB); >> sethwm(ZMQ_RCVHWM,clients); >> void *workers=zmq_socket(ctx, ZMQ_PUB); >> sethwm(ZMQ_SNDHWM,workers); >> int rc=zmq_setsockopt(clients,ZMQ_SUBSCRIBE,"",0); >> if (rc!=0) printf("Error in line %i\n",__LINE__ -1); >> rc=zmq_bind(clients,"tcp://127.0.0.1:10000"); >> if (rc!=0) printf("Error %i %s in line >> %i\n",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1); >> rc=zmq_bind(workers,"tcp://127.0.0.1:10001"); >> if (rc!=0) printf("Error %i %s in line >> %i\n",zmq_errno(),zmq_strerror(zmq_errno()),__LINE__ -1); >> zmq_pollitem_t items[1]; >> items[0].socket=clients; >> items[0].events=ZMQ_POLLIN; >> int currentWorker=-1,nextWorker=0; >> int workerCount=1; >> bool lastReceived=false; >> do { >> items[0].revents=0; >> rc=zmq_poll(items,1,10); >> if (rc<0) printf("Error in line %i\n",__LINE__ -1); >> if (rc==0) continue; >> if (items[0].revents) { >> // we got a request >> zmsg_t* request=zmsg_recv(items[0].socket); >> if (request==0) printf("Error in line %i\n",__LINE__ -1); >> zmsg_first(request); >> zmsg_next(request); >> zframe_t* data_f=zmsg_next(request); >> char* data=(char*)zframe_data(data_f); >> rc=zmsg_send(&request,workers); >> if (request!=0) printf("Error in line %i\n",__LINE__ -1); >> if (rc<0) >> printf("error sending in router: %is >> %s\n",errno,strerror(errno)); >> } >> } while (workerCount>0 && !leave); >> } >> public: >> static void* ctx; >> static void* lauch_me(void*p) { >> static_cast<Router*>(p)->goRouter(); >> return 0; >> } >> }; >> >> class Worker { >> public: volatile bool leave=false; >> private: >> void inline goWorker() { >> void *router=zmq_socket(ctx, ZMQ_SUB); >> sethwm(ZMQ_RCVHWM,router); >> char instanceName[10]; >> memcpy(instanceName,"Worker000",10); >> instanceName[6]+=instanceNumber/100; >> instanceName[7]+=(instanceNumber%100)/10; >> instanceName[8]+=instanceNumber%10; >> int rc=zmq_setsockopt(router,ZMQ_SUBSCRIBE,0,0); >> if (rc!=0) printf("Error in line %i\n",__LINE__ -1); >> rc=zmq_connect(router,"tcp://127.0.0.1:10001"); >> if (rc!=0) printf("Error in line %i\n",__LINE__ -1); >> zmq_pollitem_t items[1]; >> items[0].socket=router; >> items[0].events=ZMQ_POLLIN; >> int lastMessageNumber=-1,currentMessageNumber=-1; >> bool leaveByMessage=false; >> int OKcount=0; >> while (!leave) { >> items[0].revents=0; >> rc=zmq_poll(items,1,10); >> if (rc<0) printf("Error in line %i\n",__LINE__ -1); >> if (rc==0) continue; >> if (items[0].revents) { >> zmsg_t* request=zmsg_recv(items[0].socket); >> zframe_t *data_f=zmsg_first(request); >> char *data=(char*)zframe_data(data_f); >> if (strncmp(data,"Die!",4)==0) { >> leave=true; leaveByMessage=true; >> } else { >> currentMessageNumber= *((int*)data); >> if (currentMessageNumber == lastMessageNumber+1) >> OKcount++; >> else >> printf("worker %i expected message %i, got >> message %i\n",instanceNumber,lastMessageNumber+1,currentMessageNumber); >> lastMessageNumber=currentMessageNumber; >> } >> } >> } >> printf("worker %i ending after %i good messages%s. last message >> was %i \n",instanceNumber,OKcount,leaveByMessage?"":" by hard >> termination",lastMessageNumber); >> sleep(1); >> rc=zmq_disconnect(router,"tcp://127.0.0.1:10001"); >> if (rc!=0) printf("Error in line %i\n",__LINE__ -1); >> rc=zmq_close (router); >> if (rc!=0) printf("Error in line %i\n",__LINE__ -1); >> } >> public: >> byte instanceNumber; >> static void* ctx; >> static void* lauch_me(void*p) { >> static_cast<Worker*>(p)->goWorker(); >> return 0; >> } >> }; >> >> void* Router::ctx=0; >> void* Worker::ctx=0; >> >> int main(int argc, char**argv) { >> Router::ctx=zmq_ctx_new(); // create global context >> zmq_ctx_set(Router::ctx,ZMQ_IO_THREADS,100); >> Worker::ctx=Router::ctx; // use same context in worker and router >> >> Router *router=new Router(); >> pthread_t r_tid; >> printf("starting Router ...\n"); >> int ret=pthread_create(&r_tid, 0, Router::lauch_me, router); >> if (ret!=0) exit(1); >> bool a=true; >> // while (a) sleep (1); >> sleep(1); >> printf(" thread id %li\n",r_tid); >> >> Worker *workers[100]; >> pthread_t worker_ids[100]; >> printf("starting %i Workers ...\n",numWorkers); >> for(int i=0; i<numWorkers; i++) { >> workers[i]=new Worker(); >> workers[i]->instanceNumber=i; >> int ret=pthread_create(&worker_ids[i], 0, Worker::lauch_me, >> workers[i]); >> if (ret!=0) exit(1); >> } >> printf("finished starting Workers ...\n"); >> sleep(10); >> >> printf("sending messages ...\n"); >> >> void* socket=zmq_socket(Router::ctx,ZMQ_PUB); >> sethwm(ZMQ_SNDHWM,socket); >> zmq_connect(socket,"tcp://127.0.0.1:10000"); >> for (int i=0; i<10000; i++) { >> // if (i%100==0) printf("."); >> int rc; >> int errorcount=0; >> int *datas=(int*)malloc(4); *datas=i; >> rc=zmq_send(socket,datas,sizeof(*datas),0); >> if (rc!=4) printf("fehler beim senden von message %i\n",i); >> } >> printf("sending messages done\n"); >> int rc=zmq_send(socket,"Die!",4,0); >> if (rc!=4) { >> printf("cound not send termination message to workers, killing >> them by static variable\n"); >> for (int i=0; i<numWorkers; i++) { >> workers[i]->leave=true; >> } >> } >> sleep(30); >> router->leave=true; >> void*result; >> for (int i=0; i<numWorkers; i++) { >> workers[i]->leave=true; >> pthread_join(worker_ids[i],&result); >> } >> printf("all own thread have terminated\n"); >> pthread_join(r_tid,&result); >> printf("router has terminated\n"); >> } >> >> >> >> printf("cound not send termination message to worker %i"); >> >> >> _______________________________________________ >> zeromq-dev mailing list >> [email protected] >> http://lists.zeromq.org/mailman/listinfo/zeromq-dev >> >> _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > > > _______________________________________________ > zeromq-dev mailing > [email protected]http://lists.zeromq.org/mailman/listinfo/zeromq-dev > > > > > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > >
_______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
