请问我想实现一对多的模式。你这边有什么好的方法吗?


On 08/13/2018 14:52, 纪明 wrote:
Hi all:

    We are using ZMQ to do some multicast work. The service keep
crashing, and we found it has something to do with pgm_receiver.

    Specifically, there is a function called
zmq::pgm_receiver_t::restart_input(), when it receives some data, it
calls decoder to decode the message. On line 132, it checks if the
message size is greater than zero. If yes, it will call process_input()
function to decode the message. However, when insize is greater than
zero, inpos could point to null. When this happens, zmq crashes when
calling memcpy to copy something to the memory that inpos points to.
This actually looks like a threading issue to us.

    We really appreciate if anyone familiar with this zmq could point
out a solution to this. We are using zmq in a real time environment,
occassional message drop is more acceptable than crashing the service.
We tried to change the source code a little bit, from "if (insize > 0)"
to "if (insize > 0 && inpos)". It caused other problem.

Thanks a lot in advance.
Ming


The relevant zmq code looks like this:

void zmq::pgm_receiver_t::restart_input ()
{
    zmq_assert (session != NULL);
    zmq_assert (active_tsi != NULL);

    const peers_t::iterator it = peers.find (*active_tsi);
    zmq_assert (it != peers.end ());
    zmq_assert (it->second.joined);

    //  Push the pending message into the session.
    int rc = session->push_msg (it->second.decoder->msg ());
    errno_assert (rc == 0);

    if (insize > 0) {
        rc = process_input (it->second.decoder);
        if (rc == -1) {
            //  HWM reached; we will try later.
            if (errno == EAGAIN) {
                session->flush ();
                return;
            }
            //  Data error. Delete message decoder, mark the
            //  peer as not joined and drop remaining data.
            it->second.joined = false;
            LIBZMQ_DELETE(it->second.decoder);
            insize = 0;
        }
    }

    //  Resume polling.
    set_pollin (pipe_handle);
    set_pollin (socket_handle);

    active_tsi = NULL;
    in_event ();
}

int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder)
{
    zmq_assert (session != NULL);

    while (insize > 0) {
        size_t n = 0;
        int rc = decoder->decode (inpos, insize, n);
        if (rc == -1)
            return -1;
        inpos += n;
        insize -= n;
        if (rc == 0)
            break;
        rc = session->push_msg (decoder->msg ());
        if (rc == -1) {
            errno_assert (errno == EAGAIN);
            return -1;
        }
    }
    return 0;
}





_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to