I agree, the sync shouldn't be used.

The new socket option sounds like a good idea, it will keep default
behaviour as-is and avoid breaking the API.

On 11 October 2016 at 12:22, Doron Somech <somdo...@gmail.com> wrote:
> The sync is actually used for thread safe socket, I don't think it is
> a good idea to use it.
>
> Actually we already have thread safe sockets, we can use Radio-dish.
> Maybe creating a socket option to use radio-dish instead of pair, by
> default the monitor will use pair, if the new option
> (ZMQ_MONITOR_USE_RADIO?) is enabled socket will create radio socket
> instead.
>
>
>
> On Tue, Oct 11, 2016 at 2:03 PM, Auer, Jens <jens.a...@cgi.com> wrote:
>> Hi,
>>
>>
>>
>> I’ve patched socket_base_t to protect the monitor socket with a mutex. There
>> was an unused mutex “sync” already available in socket_base_t so I’ve used
>> this J. If you want I can upload a pullrequest.
>>
>>
>>
>> Cheers,
>>
>>   Jens
>>
>>
>>
>> --
>>
>> Dr. Jens Auer | CGI | Software Engineer
>>
>> CGI Deutschland Ltd. & Co. KG
>> Rheinstraße 95 | 64295 Darmstadt | Germany
>>
>> T: +49 6151 36860 154
>>
>> jens.a...@cgi.com
>>
>> Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter
>> de.cgi.com/pflichtangaben.
>>
>>
>>
>> CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to
>> CGI Group Inc. and its affiliates may be contained in this message. If you
>> are not a recipient indicated or intended in this message (or responsible
>> for delivery of this message to such person), or you think for any reason
>> that this message may have been addressed to you in error, you may not use
>> or copy or deliver this message to anyone else. In such case, you should
>> destroy this message and are asked to notify the sender by reply e-mail.
>>
>>
>>
>> From: zeromq-dev [mailto:zeromq-dev-boun...@lists.zeromq.org] On Behalf Of
>> Auer, Jens
>> Sent: 11 October 2016 09:31
>> To: ZeroMQ development list
>> Subject: Re: [zeromq-dev] Assertion failure with socket monitor when clients
>> disconnect
>>
>>
>>
>> Hi Doron,
>>
>> I'm not a big fun of monitoring and prefer others solution. I actually think
>> of deprecating it,  the implementaion is buggy and actually violate the
>> don't share the socket between threads,  which is probably what causing your
>> issue.
>>
>> That’s quite a shock given that it is part of the stable API. I think in
>> this case it should not only be deprecated but removed from API completely.
>> On the other hand, something to monitor the connections is probably needed.
>> Our clients are very concerned with TCP connections and would always insist
>> on logging capabilities of TCP connection state changes. It would also be
>> nice to get some statistics from the connections, e.g. number of received
>> bytes or messages.
>>
>> Anyway,  you have two options,  I think you can manage without the
>> monitoring,  I can try and help you find other solutions.  Another option is
>> to try and not listen to all events, maybe this will avoid the sharing
>> violation.
>>
>> I would like to replace the monitor with something else, but I am not sure
>> how to do this given our requirements. Currently, we use the monitor to
>>
>> -       Log TCP connection events for connect, disconnect, accept, listen
>> and reconnection retry events
>>
>> -       Limit the number of reconnection attempts
>>
>> Unfortunately this includes disconnect events which are exactly the events
>> causing the crashes right now. The other events are probably rare enough
>> that there is no problem.
>>
>> Is there another way to implement this without using the socket monitor? We
>> use Router, Dealer, Sub, XPub and Stream sockets. I have full control over
>> the protocol between the Router/Dealer, but I cannot change the protocol
>> between Pub/Sub and external clients over Stream sockets, so I cannot add
>> control messages here.
>>
>> I think an easy fix for my issues would be to add a mutex to protect the
>> monitor socket in socket_base_t. I guess this was not done because it would
>> block the thread and probably impact performance, but at least it will work
>> correctly and not crash. It should be good enough for our use-case.
>>
>> An idea for a non-blocking solution would be to have an independent monitor
>> class as there are listener and accepter classes which has an inproc SUB
>> socket and the PAIR socket. Each ZeroMQ socket would then create a monitor
>> when it is created, and each session object would have a PUB socket to
>> broadcast events to the monitor. The monitor then forwards events received
>> from individual clients/sessions on different IO threads to the PAIR socket
>> where the application code can connect.
>>
>> Best wishes,
>>
>>   Jens
>>
>>
>>
>>
>>
>> On Oct 10, 2016 17:45, "Auer, Jens" <jens.a...@cgi.com> wrote:
>>
>> Hi,
>>
>> I have an issue with socket monitor on a ZMQ_STREAM socket. We have
>> recognized that our sometimes software crashes with an internal ZeroMQ
>> assertions when non-ZeroMQ-clients disconnect.
>> I can reproduce this with a small test program that
>> - starts a context with 6 threads
>> - create a ZMQ_SUB socket with a monitor
>> - create a ZMQ_PAIR socket and connect it to the monitor for the SUB socket
>> - creates a ZMQ_STREAM socket and binds it
>> - creates a socket monitor with a ZMQ_PAIR socket to get events
>> - runs in loop zmq_polling on the two sockets
>>
>> When I start 500 instances of this program and connect 6 clients to each of
>> them (I use netcat >/dev/null), and then kill the clients, I randomly get
>> crashes with ZeroMQ assertion failures.
>> Most of the time the failure is in the code processing the socket monitor
>> event which I copied from the man page:
>> Program terminated with signal 6, Aborted.
>> #0  0x00007fb5644435f7 in raise () from /lib64/libc.so.6
>> Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64
>> apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64
>> expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64
>> libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64
>> libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64
>> log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64
>> nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64
>> nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64
>> openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64
>> (gdb) bt
>> #0  0x00007fb5644435f7 in raise () from /lib64/libc.so.6
>> #1  0x00007fb564444ce8 in abort () from /lib64/libc.so.6
>> #2  0x00007fb56443c566 in __assert_fail_base () from /lib64/libc.so.6
>> #3  0x00007fb56443c612 in __assert_fail () from /lib64/libc.so.6
>> #4  0x00000000004b16f9 in get_monitor_event (monitor=0x2512b90, value=0x0,
>> address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:47
>> #5  0x00000000004b423e in main (argc=3, argv=0x7fffe10a2668) at
>> /home/auerj/MDAF/src/test/pepSim.cpp:268
>>
>> Sometimes, I get an internal zmq assertion:
>> Program terminated with signal 6, Aborted.
>> #0  0x00007f3e75f185f7 in raise () from /lib64/libc.so.6
>> Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64
>> apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64
>> expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64
>> libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64
>> libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64
>> log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64
>> nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64
>> nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64
>> openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64
>> (gdb) bt
>> #0  0x00007f3e75f185f7 in raise () from /lib64/libc.so.6
>> #1  0x00007f3e75f19ce8 in abort () from /lib64/libc.so.6
>> #2  0x00007f3e76cff769 in zmq::zmq_abort
>> (errmsg_=errmsg_@entry=0x7f3e76d328ed "check ()") at src/err.cpp:83
>> #3  0x00007f3e76d06232 in zmq::msg_t::size (this=this@entry=0x7ffd200e9a50)
>> at src/msg.cpp:254
>> #4  0x00007f3e76d2e035 in zmq_msg_size (msg_=msg_@entry=0x7ffd200e9a50) at
>> src/zmq.cpp:627
>> #5  0x00007f3e76d2e415 in s_recvmsg (s_=<optimized out>,
>> msg_=0x7ffd200e9a50, flags_=<optimized out>) at src/zmq.cpp:459
>> #6  0x00000000004b1654 in get_monitor_event (monitor=0x1d3fb90, value=0x0,
>> address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45
>> #7  0x00000000004b423e in main (argc=3, argv=0x7ffd200e9d68) at
>> /home/auerj/MDAF/src/test/pepSim.cpp:268
>>
>> I've added assertions to the internal function calls in ZeroMQ to better
>> trace the error. It turns out that the message is already corrupted when
>> it is read from the internal pipe:
>> #0  0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6
>> Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64
>> apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64
>> expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64
>> libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64
>> libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64
>> log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64
>> nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64
>> nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64
>> openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64
>> (gdb) bt
>> #0  0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6
>> #1  0x00007fd3bc929ce8 in abort () from /lib64/libc.so.6
>> #2  0x00007fd3bd70f769 in zmq::zmq_abort
>> (errmsg_=errmsg_@entry=0x7fd3bd742d08 "queue.front().check()") at
>> src/err.cpp:83
>> #3  0x00007fd3bd71eb1f in zmq::ypipe_t<zmq::msg_t, 256>::read
>> (this=0x1f978f0, value_=0x7ffd760e8540) at src/ypipe.hpp:346
>> #4  0x00007fd3bd71e259 in zmq::pipe_t::read (this=0x1f9f9a0,
>> msg_=msg_@entry=0x7ffd760e8540) at src/pipe.cpp:162
>> #5  0x00007fd3bd71c63e in zmq::pair_t::xrecv (this=0x1f96b90,
>> msg_=0x7ffd760e8540) at src/pair.cpp:114
>> #6  0x00007fd3bd728b5b in zmq::socket_base_t::recv (this=0x1f96b90,
>> msg_=msg_@entry=0x7ffd760e8540, flags_=0) at src/socket_base.cpp:910
>> #7  0x00007fd3bd73e809 in s_recvmsg (s_=<optimized out>,
>> msg_=0x7ffd760e8540, flags_=<optimized out>) at src/zmq.cpp:456
>> #8  0x00000000004b1654 in get_monitor_event (monitor=0x1f96b90, value=0x0,
>> address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45
>> #9  0x00000000004b423e in main (argc=3, argv=0x7ffd760e8858) at
>> /home/auerj/MDAF/src/test/pepSim.cpp:268
>>
>> The assertion is triggered by additional checks in the ypipe_t class which I
>> specialized for zmq::msg_t to add the checks. The code is:
>> 311         inline bool check_read ()
>> 312         {
>> 313             //  Was the value prefetched already? If so, return.
>> 314             if (&queue.front () != r && r)
>> 315                  return true;
>> 316
>> 317             //  There's no prefetched value, so let us prefetch more
>> values.
>> 318             //  Prefetching is to simply retrieve the
>> 319             //  pointer from c in atomic fashion. If there are no
>> 320             //  items to prefetch, set c to NULL (using
>> compare-and-swap).
>> 321             r = c.cas (&queue.front (), NULL);
>> 322
>> 323             //  If there are no elements prefetched, exit.
>> 324             //  During pipe's lifetime r should never be NULL, however,
>> 325             //  it can happen during pipe shutdown when items
>> 326             //  are being deallocated.
>> 327             if (&queue.front () == r || !r)
>> 328                 return false;
>> 329
>> 330             zmq_assert( queue.front().check() );
>> 331
>> 332             //  There was at least one value prefetched.
>> 333             return true;
>> 334         }
>> 335
>> 336         //  Reads an item from the pipe. Returns false if there is no
>> value.
>> 337         //  available.
>> 338         inline bool read (T *value_)
>> 339         {
>> 340             //  Try to prefetch a value.
>> 341             if (!check_read ())
>> 342                 return false;
>> 343
>> 344             //  There was at least one value prefetched.
>> 345             //  Return it to the caller.
>> 346             zmq_assert( queue.front().check() );
>> 347             *value_ = queue.front ();
>> 348             zmq_assert( value_->check() );
>> 349             queue.pop ();
>> 350             zmq_assert( value_->check() );
>> 351
>> 352             return true;
>> 353         }
>>
>>
>> I have another assertion sometimes when sending the monitor event, but here
>> I am not sure if this is due to my modification
>> adding the assertion. The stack trace is:
>> #0  0x00007feaecc625f7 in raise () from /lib64/libc.so.6
>> #1  0x00007feaecc63ce8 in abort () from /lib64/libc.so.6
>> #2  0x00007feaeda49769 in zmq::zmq_abort
>> (errmsg_=errmsg_@entry=0x7feaeda7ccf3 "queue.back().check()") at
>> src/err.cpp:83
>> #3  0x00007feaeda58856 in zmq::ypipe_t<zmq::msg_t, 256>::write
>> (this=0xaa2670, value_=..., incomplete_=<optimized out>) at
>> src/ypipe.hpp:259
>> #4  0x00007feaeda57645 in zmq::pipe_t::write (this=0xaac1c0,
>> msg_=msg_@entry=0x7feae7ce10f0) at src/pipe.cpp:215
>> #5  0x00007feaeda56490 in zmq::pair_t::xsend (this=0xaa1ed0,
>> msg_=0x7feae7ce10f0) at src/pair.cpp:90
>> #6  0x00007feaeda62971 in zmq::socket_base_t::send
>> (this=this@entry=0xaa1ed0, msg_=msg_@entry=0x7feae7ce10f0,
>> flags_=flags_@entry=2) at src/socket_base.cpp:843
>> #7  0x00007feaeda7846c in s_sendmsg (s_=0xaa1ed0, msg_=0x7feae7ce10f0,
>> flags_=2) at src/zmq.cpp:346
>> #8  0x00007feaeda62d93 in zmq::socket_base_t::monitor_event (this=0xaa16a0,
>> event_=event_@entry=512, value_=25, addr_="tcp://192.168.120.1:9494") at
>> src/socket_base.cpp:1357
>> #9  0x00007feaeda62f1d in zmq::socket_base_t::event_disconnected
>> (this=<optimized out>, addr_="tcp://192.168.120.1:9494", fd_=<optimized
>> out>) at src/socket_base.cpp:1344
>> #10 0x00007feaeda6b45a in zmq::stream_engine_t::error
>> (this=this@entry=0x7feae00022c0,
>> reason=reason@entry=zmq::stream_engine_t::connection_error) at
>> src/stream_engine.cpp:936
>> #11 0x00007feaeda6c5db in zmq::stream_engine_t::in_event
>> (this=0x7feae00022c0) at src/stream_engine.cpp:300
>> #12 0x00007feaeda493ee in zmq::epoll_t::loop (this=0xaa0830) at
>> src/epoll.cpp:176
>> #13 0x00007feaeda73150 in thread_routine (arg_=0xaa08b0) at
>> src/thread.cpp:96
>> #14 0x00007feaed816dc5 in start_thread () from /lib64/libpthread.so.0
>> #15 0x00007feaecd23ced in clone () from /lib64/libc.so.6
>>
>> and the code in question is
>> 253         inline void write (const T &value_, bool incomplete_)
>> 254         {
>> 255             zmq_assert(value_.check() );
>> 256
>> 257             //  Place the value to the queue, add new terminator
>> element.
>> 258             queue.back () = value_;
>> 259             queue.push ();
>> 260             zmq_assert( queue.back().check() );
>> 261
>> 262             //  Move the "flush up to here" poiter.
>> 263             if (!incomplete_)
>> 264                 f = &queue.back ();
>> 265         }
>>
>> I can also reproduce the crashes with a simpler test program that does only
>> have the ZMQ_STREAM socket with its monitor and
>> does not open a ZMQ_SUB socket. I could not reproduce it with less threads,
>> i.e. the default of 2 io threads.
>>
>> The code for my test program is
>> #include <string>
>>
>> #include <zmq.hpp>
>>
>> static int
>> get_monitor_event(void *monitor,
>>                   int *value = nullptr,
>>                   char **address = nullptr)
>> {
>>     //  First frame in message contains event number and value
>>     zmq_msg_t msg;
>>     zmq_msg_init (&msg);
>>     if (zmq_msg_recv (&msg, monitor, 0) == -1)
>>         return -1;              //  Interruped, presumably
>>     assert (zmq_msg_more (&msg));
>>
>>     uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
>>     uint16_t event = *(uint16_t *) (data);
>>     if (value)
>>         *value = *(uint32_t *) (data + 2);
>>
>>     //  Second frame in message contains event address
>>     zmq_msg_init (&msg);
>>     if (zmq_msg_recv (&msg, monitor, 0) == -1)
>>         return -1;              //  Interruped, presumably
>>     assert (!zmq_msg_more (&msg));
>>
>>     if (address) {
>>         data = (uint8_t *) zmq_msg_data (&msg);
>>         size_t size = zmq_msg_size (&msg);
>>         *address = (char *) malloc (size + 1);
>>         memcpy (*address, data, size);
>>         *address [size] = 0;
>>     }
>>     return event;
>> }
>>
>> int main(int argc, char* argv[])
>> {
>>     std::string const address = argv[1];
>>     std::string const port = argv[2];
>>     std::string const stream_monitor_address("inproc://stream_monitor");
>>     std::string const sub_monitor_address("inproc://sub_monitor");
>>
>>     zmq::context_t ctx(6);
>>     zmq::socket_t stream(ctx, ZMQ_STREAM);
>>     zmq::socket_t sub(ctx, ZMQ_SUB);
>>
>>     zmq_socket_monitor( static_cast<void*>(stream),
>> stream_monitor_address.c_str(), ZMQ_EVENT_ALL);
>>     zmq_socket_monitor( static_cast<void*>(sub),
>> sub_monitor_address.c_str(), ZMQ_EVENT_ALL);
>>
>>     zmq::socket_t stream_monitor(ctx, ZMQ_PAIR);
>>     zmq::socket_t sub_monitor(ctx, ZMQ_PAIR);
>>
>>     stream_monitor.connect( stream_monitor_address.c_str() );
>>     sub_monitor.connect( sub_monitor_address.c_str() );
>>
>>     stream.bind("tcp://" + address + ":" + port);
>>     //sub.connect("tcp://127.0.0.1:6000");
>>
>>     std::vector<zmq::message_t> clientIds;
>>
>>     while(1)
>>     {
>>         zmq::pollitem_t items[4] = {
>>                 {static_cast<void*>(stream_monitor), -1, ZMQ_POLLIN, 0},
>>                 { static_cast<void*>(stream), -1, ZMQ_POLLIN, 0},
>>                 {static_cast<void*>(sub_monitor), -1, ZMQ_POLLIN, 0},
>>                 { static_cast<void*>(sub), -1, ZMQ_POLLIN, 0},
>>
>>         };
>>
>>         auto rc = zmq::poll( items, 4 );
>>
>>         if (rc != -1)
>>         {
>>             if (0 != (items[1].revents & ZMQ_POLLIN) )
>>             {
>>                 zmq::message_t id;
>>                 zmq::message_t m;
>>
>>                 stream.recv(&id);
>>                 stream.recv(&m);
>>             }
>>             if (0 != (items[3].revents & ZMQ_POLLIN) )
>>             {
>>             }
>>             if (0 != (items[0].revents & ZMQ_POLLIN))
>>             {
>>                 std::cout << "STREAM MONITOR " << get_monitor_event(
>> static_cast<void*>(stream_monitor), nullptr, nullptr) << std::endl;
>>             }
>>             if (0 != (items[2].revents & ZMQ_POLLIN) )
>>             {
>>                 std::cout << "SUB MONITOR " << get_monitor_event(
>> static_cast<void*>(sub_monitor), nullptr, nullptr) << std::endl;
>>             }
>>         }
>>     }
>>
>>     return 0;
>> }
>>
>> It uses the C++ interface from
>> https://github.com/zeromq/cppzmq/blob/master/zmq.hpp.
>>
>> Best wishes,
>>   Jens Auer
>>
>> --
>>
>> Jens Auer |
>> CGI | Software-Engineer
>>
>> CGI (Germany) GmbH & Co. KG
>>
>> Rheinstraße 95 | 64295 Darmstadt | Germany
>> T: +49 6151 36860 154
>>
>> jens.a...@cgi.com
>>
>> Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter
>> de.cgi.com/pflichtangaben.
>>
>>
>> CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to
>> CGI Group Inc. and its affiliates may be contained in this message. If you
>> are not a recipient indicated or intended in this message
>>  (or responsible for delivery of this message to such person), or you think
>> for any reason that this message may have been addressed to you in error,
>> you may not use or copy or deliver this message to anyone else. In such
>> case, you should destroy this message
>>  and are asked to notify the sender by reply e-mail.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev@lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> zeromq-dev@lists.zeromq.org
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to