I agree, the sync shouldn't be used. The new socket option sounds like a good idea, it will keep default behaviour as-is and avoid breaking the API.
On 11 October 2016 at 12:22, Doron Somech <somdo...@gmail.com> wrote: > The sync is actually used for thread safe socket, I don't think it is > a good idea to use it. > > Actually we already have thread safe sockets, we can use Radio-dish. > Maybe creating a socket option to use radio-dish instead of pair, by > default the monitor will use pair, if the new option > (ZMQ_MONITOR_USE_RADIO?) is enabled socket will create radio socket > instead. > > > > On Tue, Oct 11, 2016 at 2:03 PM, Auer, Jens <jens.a...@cgi.com> wrote: >> Hi, >> >> >> >> I’ve patched socket_base_t to protect the monitor socket with a mutex. There >> was an unused mutex “sync” already available in socket_base_t so I’ve used >> this J. If you want I can upload a pullrequest. >> >> >> >> Cheers, >> >> Jens >> >> >> >> -- >> >> Dr. Jens Auer | CGI | Software Engineer >> >> CGI Deutschland Ltd. & Co. KG >> Rheinstraße 95 | 64295 Darmstadt | Germany >> >> T: +49 6151 36860 154 >> >> jens.a...@cgi.com >> >> Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter >> de.cgi.com/pflichtangaben. >> >> >> >> CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to >> CGI Group Inc. and its affiliates may be contained in this message. If you >> are not a recipient indicated or intended in this message (or responsible >> for delivery of this message to such person), or you think for any reason >> that this message may have been addressed to you in error, you may not use >> or copy or deliver this message to anyone else. In such case, you should >> destroy this message and are asked to notify the sender by reply e-mail. >> >> >> >> From: zeromq-dev [mailto:zeromq-dev-boun...@lists.zeromq.org] On Behalf Of >> Auer, Jens >> Sent: 11 October 2016 09:31 >> To: ZeroMQ development list >> Subject: Re: [zeromq-dev] Assertion failure with socket monitor when clients >> disconnect >> >> >> >> Hi Doron, >> >> I'm not a big fun of monitoring and prefer others solution. I actually think >> of deprecating it, the implementaion is buggy and actually violate the >> don't share the socket between threads, which is probably what causing your >> issue. >> >> That’s quite a shock given that it is part of the stable API. I think in >> this case it should not only be deprecated but removed from API completely. >> On the other hand, something to monitor the connections is probably needed. >> Our clients are very concerned with TCP connections and would always insist >> on logging capabilities of TCP connection state changes. It would also be >> nice to get some statistics from the connections, e.g. number of received >> bytes or messages. >> >> Anyway, you have two options, I think you can manage without the >> monitoring, I can try and help you find other solutions. Another option is >> to try and not listen to all events, maybe this will avoid the sharing >> violation. >> >> I would like to replace the monitor with something else, but I am not sure >> how to do this given our requirements. Currently, we use the monitor to >> >> - Log TCP connection events for connect, disconnect, accept, listen >> and reconnection retry events >> >> - Limit the number of reconnection attempts >> >> Unfortunately this includes disconnect events which are exactly the events >> causing the crashes right now. The other events are probably rare enough >> that there is no problem. >> >> Is there another way to implement this without using the socket monitor? We >> use Router, Dealer, Sub, XPub and Stream sockets. I have full control over >> the protocol between the Router/Dealer, but I cannot change the protocol >> between Pub/Sub and external clients over Stream sockets, so I cannot add >> control messages here. >> >> I think an easy fix for my issues would be to add a mutex to protect the >> monitor socket in socket_base_t. I guess this was not done because it would >> block the thread and probably impact performance, but at least it will work >> correctly and not crash. It should be good enough for our use-case. >> >> An idea for a non-blocking solution would be to have an independent monitor >> class as there are listener and accepter classes which has an inproc SUB >> socket and the PAIR socket. Each ZeroMQ socket would then create a monitor >> when it is created, and each session object would have a PUB socket to >> broadcast events to the monitor. The monitor then forwards events received >> from individual clients/sessions on different IO threads to the PAIR socket >> where the application code can connect. >> >> Best wishes, >> >> Jens >> >> >> >> >> >> On Oct 10, 2016 17:45, "Auer, Jens" <jens.a...@cgi.com> wrote: >> >> Hi, >> >> I have an issue with socket monitor on a ZMQ_STREAM socket. We have >> recognized that our sometimes software crashes with an internal ZeroMQ >> assertions when non-ZeroMQ-clients disconnect. >> I can reproduce this with a small test program that >> - starts a context with 6 threads >> - create a ZMQ_SUB socket with a monitor >> - create a ZMQ_PAIR socket and connect it to the monitor for the SUB socket >> - creates a ZMQ_STREAM socket and binds it >> - creates a socket monitor with a ZMQ_PAIR socket to get events >> - runs in loop zmq_polling on the two sockets >> >> When I start 500 instances of this program and connect 6 clients to each of >> them (I use netcat >/dev/null), and then kill the clients, I randomly get >> crashes with ZeroMQ assertion failures. >> Most of the time the failure is in the code processing the socket monitor >> event which I copied from the man page: >> Program terminated with signal 6, Aborted. >> #0 0x00007fb5644435f7 in raise () from /lib64/libc.so.6 >> Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64 >> apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64 >> expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64 >> libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64 >> libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64 >> log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64 >> nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64 >> nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64 >> openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64 >> (gdb) bt >> #0 0x00007fb5644435f7 in raise () from /lib64/libc.so.6 >> #1 0x00007fb564444ce8 in abort () from /lib64/libc.so.6 >> #2 0x00007fb56443c566 in __assert_fail_base () from /lib64/libc.so.6 >> #3 0x00007fb56443c612 in __assert_fail () from /lib64/libc.so.6 >> #4 0x00000000004b16f9 in get_monitor_event (monitor=0x2512b90, value=0x0, >> address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:47 >> #5 0x00000000004b423e in main (argc=3, argv=0x7fffe10a2668) at >> /home/auerj/MDAF/src/test/pepSim.cpp:268 >> >> Sometimes, I get an internal zmq assertion: >> Program terminated with signal 6, Aborted. >> #0 0x00007f3e75f185f7 in raise () from /lib64/libc.so.6 >> Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64 >> apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64 >> expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64 >> libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64 >> libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64 >> log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64 >> nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64 >> nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64 >> openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64 >> (gdb) bt >> #0 0x00007f3e75f185f7 in raise () from /lib64/libc.so.6 >> #1 0x00007f3e75f19ce8 in abort () from /lib64/libc.so.6 >> #2 0x00007f3e76cff769 in zmq::zmq_abort >> (errmsg_=errmsg_@entry=0x7f3e76d328ed "check ()") at src/err.cpp:83 >> #3 0x00007f3e76d06232 in zmq::msg_t::size (this=this@entry=0x7ffd200e9a50) >> at src/msg.cpp:254 >> #4 0x00007f3e76d2e035 in zmq_msg_size (msg_=msg_@entry=0x7ffd200e9a50) at >> src/zmq.cpp:627 >> #5 0x00007f3e76d2e415 in s_recvmsg (s_=<optimized out>, >> msg_=0x7ffd200e9a50, flags_=<optimized out>) at src/zmq.cpp:459 >> #6 0x00000000004b1654 in get_monitor_event (monitor=0x1d3fb90, value=0x0, >> address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45 >> #7 0x00000000004b423e in main (argc=3, argv=0x7ffd200e9d68) at >> /home/auerj/MDAF/src/test/pepSim.cpp:268 >> >> I've added assertions to the internal function calls in ZeroMQ to better >> trace the error. It turns out that the message is already corrupted when >> it is read from the internal pipe: >> #0 0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6 >> Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64 >> apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64 >> expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64 >> libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64 >> libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64 >> log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64 >> nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64 >> nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64 >> openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64 >> (gdb) bt >> #0 0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6 >> #1 0x00007fd3bc929ce8 in abort () from /lib64/libc.so.6 >> #2 0x00007fd3bd70f769 in zmq::zmq_abort >> (errmsg_=errmsg_@entry=0x7fd3bd742d08 "queue.front().check()") at >> src/err.cpp:83 >> #3 0x00007fd3bd71eb1f in zmq::ypipe_t<zmq::msg_t, 256>::read >> (this=0x1f978f0, value_=0x7ffd760e8540) at src/ypipe.hpp:346 >> #4 0x00007fd3bd71e259 in zmq::pipe_t::read (this=0x1f9f9a0, >> msg_=msg_@entry=0x7ffd760e8540) at src/pipe.cpp:162 >> #5 0x00007fd3bd71c63e in zmq::pair_t::xrecv (this=0x1f96b90, >> msg_=0x7ffd760e8540) at src/pair.cpp:114 >> #6 0x00007fd3bd728b5b in zmq::socket_base_t::recv (this=0x1f96b90, >> msg_=msg_@entry=0x7ffd760e8540, flags_=0) at src/socket_base.cpp:910 >> #7 0x00007fd3bd73e809 in s_recvmsg (s_=<optimized out>, >> msg_=0x7ffd760e8540, flags_=<optimized out>) at src/zmq.cpp:456 >> #8 0x00000000004b1654 in get_monitor_event (monitor=0x1f96b90, value=0x0, >> address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45 >> #9 0x00000000004b423e in main (argc=3, argv=0x7ffd760e8858) at >> /home/auerj/MDAF/src/test/pepSim.cpp:268 >> >> The assertion is triggered by additional checks in the ypipe_t class which I >> specialized for zmq::msg_t to add the checks. The code is: >> 311 inline bool check_read () >> 312 { >> 313 // Was the value prefetched already? If so, return. >> 314 if (&queue.front () != r && r) >> 315 return true; >> 316 >> 317 // There's no prefetched value, so let us prefetch more >> values. >> 318 // Prefetching is to simply retrieve the >> 319 // pointer from c in atomic fashion. If there are no >> 320 // items to prefetch, set c to NULL (using >> compare-and-swap). >> 321 r = c.cas (&queue.front (), NULL); >> 322 >> 323 // If there are no elements prefetched, exit. >> 324 // During pipe's lifetime r should never be NULL, however, >> 325 // it can happen during pipe shutdown when items >> 326 // are being deallocated. >> 327 if (&queue.front () == r || !r) >> 328 return false; >> 329 >> 330 zmq_assert( queue.front().check() ); >> 331 >> 332 // There was at least one value prefetched. >> 333 return true; >> 334 } >> 335 >> 336 // Reads an item from the pipe. Returns false if there is no >> value. >> 337 // available. >> 338 inline bool read (T *value_) >> 339 { >> 340 // Try to prefetch a value. >> 341 if (!check_read ()) >> 342 return false; >> 343 >> 344 // There was at least one value prefetched. >> 345 // Return it to the caller. >> 346 zmq_assert( queue.front().check() ); >> 347 *value_ = queue.front (); >> 348 zmq_assert( value_->check() ); >> 349 queue.pop (); >> 350 zmq_assert( value_->check() ); >> 351 >> 352 return true; >> 353 } >> >> >> I have another assertion sometimes when sending the monitor event, but here >> I am not sure if this is due to my modification >> adding the assertion. The stack trace is: >> #0 0x00007feaecc625f7 in raise () from /lib64/libc.so.6 >> #1 0x00007feaecc63ce8 in abort () from /lib64/libc.so.6 >> #2 0x00007feaeda49769 in zmq::zmq_abort >> (errmsg_=errmsg_@entry=0x7feaeda7ccf3 "queue.back().check()") at >> src/err.cpp:83 >> #3 0x00007feaeda58856 in zmq::ypipe_t<zmq::msg_t, 256>::write >> (this=0xaa2670, value_=..., incomplete_=<optimized out>) at >> src/ypipe.hpp:259 >> #4 0x00007feaeda57645 in zmq::pipe_t::write (this=0xaac1c0, >> msg_=msg_@entry=0x7feae7ce10f0) at src/pipe.cpp:215 >> #5 0x00007feaeda56490 in zmq::pair_t::xsend (this=0xaa1ed0, >> msg_=0x7feae7ce10f0) at src/pair.cpp:90 >> #6 0x00007feaeda62971 in zmq::socket_base_t::send >> (this=this@entry=0xaa1ed0, msg_=msg_@entry=0x7feae7ce10f0, >> flags_=flags_@entry=2) at src/socket_base.cpp:843 >> #7 0x00007feaeda7846c in s_sendmsg (s_=0xaa1ed0, msg_=0x7feae7ce10f0, >> flags_=2) at src/zmq.cpp:346 >> #8 0x00007feaeda62d93 in zmq::socket_base_t::monitor_event (this=0xaa16a0, >> event_=event_@entry=512, value_=25, addr_="tcp://192.168.120.1:9494") at >> src/socket_base.cpp:1357 >> #9 0x00007feaeda62f1d in zmq::socket_base_t::event_disconnected >> (this=<optimized out>, addr_="tcp://192.168.120.1:9494", fd_=<optimized >> out>) at src/socket_base.cpp:1344 >> #10 0x00007feaeda6b45a in zmq::stream_engine_t::error >> (this=this@entry=0x7feae00022c0, >> reason=reason@entry=zmq::stream_engine_t::connection_error) at >> src/stream_engine.cpp:936 >> #11 0x00007feaeda6c5db in zmq::stream_engine_t::in_event >> (this=0x7feae00022c0) at src/stream_engine.cpp:300 >> #12 0x00007feaeda493ee in zmq::epoll_t::loop (this=0xaa0830) at >> src/epoll.cpp:176 >> #13 0x00007feaeda73150 in thread_routine (arg_=0xaa08b0) at >> src/thread.cpp:96 >> #14 0x00007feaed816dc5 in start_thread () from /lib64/libpthread.so.0 >> #15 0x00007feaecd23ced in clone () from /lib64/libc.so.6 >> >> and the code in question is >> 253 inline void write (const T &value_, bool incomplete_) >> 254 { >> 255 zmq_assert(value_.check() ); >> 256 >> 257 // Place the value to the queue, add new terminator >> element. >> 258 queue.back () = value_; >> 259 queue.push (); >> 260 zmq_assert( queue.back().check() ); >> 261 >> 262 // Move the "flush up to here" poiter. >> 263 if (!incomplete_) >> 264 f = &queue.back (); >> 265 } >> >> I can also reproduce the crashes with a simpler test program that does only >> have the ZMQ_STREAM socket with its monitor and >> does not open a ZMQ_SUB socket. I could not reproduce it with less threads, >> i.e. the default of 2 io threads. >> >> The code for my test program is >> #include <string> >> >> #include <zmq.hpp> >> >> static int >> get_monitor_event(void *monitor, >> int *value = nullptr, >> char **address = nullptr) >> { >> // First frame in message contains event number and value >> zmq_msg_t msg; >> zmq_msg_init (&msg); >> if (zmq_msg_recv (&msg, monitor, 0) == -1) >> return -1; // Interruped, presumably >> assert (zmq_msg_more (&msg)); >> >> uint8_t *data = (uint8_t *) zmq_msg_data (&msg); >> uint16_t event = *(uint16_t *) (data); >> if (value) >> *value = *(uint32_t *) (data + 2); >> >> // Second frame in message contains event address >> zmq_msg_init (&msg); >> if (zmq_msg_recv (&msg, monitor, 0) == -1) >> return -1; // Interruped, presumably >> assert (!zmq_msg_more (&msg)); >> >> if (address) { >> data = (uint8_t *) zmq_msg_data (&msg); >> size_t size = zmq_msg_size (&msg); >> *address = (char *) malloc (size + 1); >> memcpy (*address, data, size); >> *address [size] = 0; >> } >> return event; >> } >> >> int main(int argc, char* argv[]) >> { >> std::string const address = argv[1]; >> std::string const port = argv[2]; >> std::string const stream_monitor_address("inproc://stream_monitor"); >> std::string const sub_monitor_address("inproc://sub_monitor"); >> >> zmq::context_t ctx(6); >> zmq::socket_t stream(ctx, ZMQ_STREAM); >> zmq::socket_t sub(ctx, ZMQ_SUB); >> >> zmq_socket_monitor( static_cast<void*>(stream), >> stream_monitor_address.c_str(), ZMQ_EVENT_ALL); >> zmq_socket_monitor( static_cast<void*>(sub), >> sub_monitor_address.c_str(), ZMQ_EVENT_ALL); >> >> zmq::socket_t stream_monitor(ctx, ZMQ_PAIR); >> zmq::socket_t sub_monitor(ctx, ZMQ_PAIR); >> >> stream_monitor.connect( stream_monitor_address.c_str() ); >> sub_monitor.connect( sub_monitor_address.c_str() ); >> >> stream.bind("tcp://" + address + ":" + port); >> //sub.connect("tcp://127.0.0.1:6000"); >> >> std::vector<zmq::message_t> clientIds; >> >> while(1) >> { >> zmq::pollitem_t items[4] = { >> {static_cast<void*>(stream_monitor), -1, ZMQ_POLLIN, 0}, >> { static_cast<void*>(stream), -1, ZMQ_POLLIN, 0}, >> {static_cast<void*>(sub_monitor), -1, ZMQ_POLLIN, 0}, >> { static_cast<void*>(sub), -1, ZMQ_POLLIN, 0}, >> >> }; >> >> auto rc = zmq::poll( items, 4 ); >> >> if (rc != -1) >> { >> if (0 != (items[1].revents & ZMQ_POLLIN) ) >> { >> zmq::message_t id; >> zmq::message_t m; >> >> stream.recv(&id); >> stream.recv(&m); >> } >> if (0 != (items[3].revents & ZMQ_POLLIN) ) >> { >> } >> if (0 != (items[0].revents & ZMQ_POLLIN)) >> { >> std::cout << "STREAM MONITOR " << get_monitor_event( >> static_cast<void*>(stream_monitor), nullptr, nullptr) << std::endl; >> } >> if (0 != (items[2].revents & ZMQ_POLLIN) ) >> { >> std::cout << "SUB MONITOR " << get_monitor_event( >> static_cast<void*>(sub_monitor), nullptr, nullptr) << std::endl; >> } >> } >> } >> >> return 0; >> } >> >> It uses the C++ interface from >> https://github.com/zeromq/cppzmq/blob/master/zmq.hpp. >> >> Best wishes, >> Jens Auer >> >> -- >> >> Jens Auer | >> CGI | Software-Engineer >> >> CGI (Germany) GmbH & Co. KG >> >> Rheinstraße 95 | 64295 Darmstadt | Germany >> T: +49 6151 36860 154 >> >> jens.a...@cgi.com >> >> Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter >> de.cgi.com/pflichtangaben. >> >> >> CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to >> CGI Group Inc. and its affiliates may be contained in this message. If you >> are not a recipient indicated or intended in this message >> (or responsible for delivery of this message to such person), or you think >> for any reason that this message may have been addressed to you in error, >> you may not use or copy or deliver this message to anyone else. In such >> case, you should destroy this message >> and are asked to notify the sender by reply e-mail. >> >> >> >> >> >> >> >> >> >> >> >> >> _______________________________________________ >> zeromq-dev mailing list >> zeromq-dev@lists.zeromq.org >> http://lists.zeromq.org/mailman/listinfo/zeromq-dev >> >> >> _______________________________________________ >> zeromq-dev mailing list >> zeromq-dev@lists.zeromq.org >> http://lists.zeromq.org/mailman/listinfo/zeromq-dev > _______________________________________________ > zeromq-dev mailing list > zeromq-dev@lists.zeromq.org > http://lists.zeromq.org/mailman/listinfo/zeromq-dev _______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev