Hi again, I was so happy that I could share the self-pipe-trick that I must have overlooked the zmq_proxy-part of your description. Sorry for that, I think my answer does not help you for your problem - at least this time.
regards, -- Patrick. On Tue, 17 Apr 2018 15:37:59 +0200 Patrick Boettcher <[email protected]> wrote: > Hi Giordano, > > > On Tue, 17 Apr 2018 13:10:33 +0000 > "Cerizza, Giordano" <[email protected]> wrote: > > > Hi, > > I have the following problem: I have a source of messages connected > > to a proxy; the PULL/PUSH proxy reroutes the messages via inproc to > > a multithreaded service that receives the messages and pushes them > > to a destination sink. Here some pseudocode to help understand the > > streaming class: > > > > void main(){ > > // definition of context (ctx_) and sockets for PULL/PUSH proxy > > (frontend,backend) (...) > > pthread worker[N] > > for loop for number of thread > > pthread_create(&worker[i], NULL, worker_task, (void*)ctx_) > > try{ > > zmq::proxy (*frontend, *backend, NULL); > > } catch(...){ > > // stuff > > } > > } > > (side-note: if you can, try using std::thread in C++) > > > void* worker_task(void *arg){ > > zmq::context_t *context = (zmq::context_t*) arg; > > // definition of the sockets to pull from (socket_from) and push to > > (socket_to) (...) > > // operations on messages (receive and send) > > (...) > > // exiting the worker_task > > pthead_exit(NULL); > > } > > > > Observations: my system hangs before exiting the child thread. I > > tried to close the sockets (zmq_close) and destruct (zmq_term) the > > context for the child but nothing happens. How do I safely leave the > > thread and move on with my code i.e. close the sockets and destroy > > the context for the proxy, destruct the streaming class, and move to > > operate on the message sink? > > There are several approaches to solve this problem, I'm using one, > which is known since a long time (before ZMQ existed). It is done with > a pipe - the mechanism is called self-pipe. In ZMQ I use inproc for > that. > > You create two ZMQ-PAIR sockets which bind/connect to the > same inproc://-endpoint. In your thread you use poll on your > data socket(s) and on one of the inproc-sockets. If you have an > 'revent' on this socket, you clean up and exit your thread. > > In the main-thread, when you want to terminate your program and thus > your thread, you write "something" on the other inproc-socket and then > you call pthread_join on your thread, knowing that it will terminate > correctly very soon. > > Here is some (pseudo)-code: Main-thread, creation and thread-start > > // create two sockets: > std::string inprocEp = "inproc://#" + > std::to_string(internalCount_.fetch_add(1)); > internal_[0]->bind(inprocEp); internal_[1]->connect(inprocEp); > > internal_[0]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); > internal_[1]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); > > receiverThread_ = std::thread(&classname::threadMethod, this); > > In thread: > > while (1) { > zmq::pollitem_t items[] = {{*socket, 0, ZMQ_POLLOUT, 0}, > {*internal_[0], 0, ZMQ_POLLIN, 0}}; > > auto ret = zmq::poll(items, 2, 5000); // 5 secs > // check ret > [..] > // an event on internal socket -> shutdown requested > if (items[1].revents != 0) > break; > } > > In main-thread ending code: > > internal_[1]->send("HUP", 3); > receiverThread_.join(); > > The variables used here: > > static std::atomic<unsigned> internalCount_; > std::shared_ptr<zmq::socket_t> internal_[2]; > > std::thread receiverThread_; > > This code is using C++11-types, I hope you can use them as well. > > HTH, > -- > Patrick. > > _______________________________________________ > zeromq-dev mailing list > [email protected] > https://lists.zeromq.org/mailman/listinfo/zeromq-dev _______________________________________________ zeromq-dev mailing list [email protected] https://lists.zeromq.org/mailman/listinfo/zeromq-dev
