Hi Giordano,
On Tue, 17 Apr 2018 13:10:33 +0000 "Cerizza, Giordano" <[email protected]> wrote: > Hi, > I have the following problem: I have a source of messages connected > to a proxy; the PULL/PUSH proxy reroutes the messages via inproc to a > multithreaded service that receives the messages and pushes them to a > destination sink. Here some pseudocode to help understand the > streaming class: > > void main(){ > // definition of context (ctx_) and sockets for PULL/PUSH proxy > (frontend,backend) (...) > pthread worker[N] > for loop for number of thread > pthread_create(&worker[i], NULL, worker_task, (void*)ctx_) > try{ > zmq::proxy (*frontend, *backend, NULL); > } catch(...){ > // stuff > } > } (side-note: if you can, try using std::thread in C++) > void* worker_task(void *arg){ > zmq::context_t *context = (zmq::context_t*) arg; > // definition of the sockets to pull from (socket_from) and push to > (socket_to) (...) > // operations on messages (receive and send) > (...) > // exiting the worker_task > pthead_exit(NULL); > } > > Observations: my system hangs before exiting the child thread. I > tried to close the sockets (zmq_close) and destruct (zmq_term) the > context for the child but nothing happens. How do I safely leave the > thread and move on with my code i.e. close the sockets and destroy > the context for the proxy, destruct the streaming class, and move to > operate on the message sink? There are several approaches to solve this problem, I'm using one, which is known since a long time (before ZMQ existed). It is done with a pipe - the mechanism is called self-pipe. In ZMQ I use inproc for that. You create two ZMQ-PAIR sockets which bind/connect to the same inproc://-endpoint. In your thread you use poll on your data socket(s) and on one of the inproc-sockets. If you have an 'revent' on this socket, you clean up and exit your thread. In the main-thread, when you want to terminate your program and thus your thread, you write "something" on the other inproc-socket and then you call pthread_join on your thread, knowing that it will terminate correctly very soon. Here is some (pseudo)-code: Main-thread, creation and thread-start // create two sockets: std::string inprocEp = "inproc://#" + std::to_string(internalCount_.fetch_add(1)); internal_[0]->bind(inprocEp); internal_[1]->connect(inprocEp); internal_[0]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); internal_[1]->setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); receiverThread_ = std::thread(&classname::threadMethod, this); In thread: while (1) { zmq::pollitem_t items[] = {{*socket, 0, ZMQ_POLLOUT, 0}, {*internal_[0], 0, ZMQ_POLLIN, 0}}; auto ret = zmq::poll(items, 2, 5000); // 5 secs // check ret [..] // an event on internal socket -> shutdown requested if (items[1].revents != 0) break; } In main-thread ending code: internal_[1]->send("HUP", 3); receiverThread_.join(); The variables used here: static std::atomic<unsigned> internalCount_; std::shared_ptr<zmq::socket_t> internal_[2]; std::thread receiverThread_; This code is using C++11-types, I hope you can use them as well. HTH, -- Patrick. _______________________________________________ zeromq-dev mailing list [email protected] https://lists.zeromq.org/mailman/listinfo/zeromq-dev
