that's a good point. I am actually not familar with the inner-working of zmq at
all. first time trying to dig into the code to fix the problem. that's why I
suspected my fix was not complete anyway.
now I have the new version as you suggested:
if (pipes.index (pipe_) < matching) {
pipes.swap (pipes.index (pipe_), matching - 1);
matching--;
}
if (pipes.index (pipe_) < active) {
pipes.swap (pipes.index (pipe_), active - 1);
active--;
}
if (pipes.index (pipe_) < eligible) {
pipes.swap(pipes.index(pipe_), eligible);
eligible--;
}
pipes.erase (pipe_);
----- Original Message -----
From: Martin Hurton
Sent: 03/21/13 07:19 PM
To: ZeroMQ development list
Subject: Re: [zeromq-dev] subscriber stopped receiving messages from XPUB socket
Hi Winston, Thanks for the patch. I think there are still some cases which are
not handled though. For example, if there are 8 pipes handled by the dist: 2
are matching, 4 are active, 6 are eligible and the last two are congested. What
happens when the first active pipe terminates? I would suggest to fix the
terminated method so that the terminating pipe is moved beyond the eligible
pointer the same way as in the write method. Then it can be safely erased. What
do you think? - Martin > void zmq::dist_t::terminated (pipe_t *pipe_) > { > //
Remove the pipe from the list; adjust number of matching, active > and/or > //
eligible pipes accordingly. > if (pipes.index (pipe_) < matching) > matching--;
> if (pipes.index (pipe_) < active) > active--; > bool swapEligible = false; >
if (pipes.index (pipe_) < eligible) { > eligible--; > swapEligible = true; > }
> if (swapEligible) { > pipes.swap(pipes.index(pipe_), eligible); >
pipes.erase(eligible); > } else { > pipes.erase (pipe_); > } > } > > > > > >
----- Original Message ----- > > From: Pieter Hintjens > > Sent: 03/21/13 10:06
AM > > To: ZeroMQ development list > > Subject: Re: [zeromq-dev] subscriber
stopped receiving messages from XPUB > socket > > > > Hi Winston, > > Great
analysis of the problem! Would you like to send a pull request > with a patch?
> > -Pieter > > On Thu, Mar 21, 2013 at 2:43 PM, Winston Huang
<[email protected]> wrote: >> Pieter, >> >> Thanks for your reply. I think I
might have found the problem. I have a >> xpub >> socket that has about 10
subscribers. The following events happened: >> >> 1) one subscriber's hwm is
reached and it's moved to the end of the pipes. >> (in zmq::dist_t::write). >>
>> 2) another subscriber was terminated (zmq::xpub_t::xterminated), causing >>
the >> pipe to be removed from the dist. however in zmq::dist_t::terminated,
the >> terminated pipe was removed by moving the last pipe to the to-be-removed
>> pipe's spot. therefore the deactivated pipe in step 1 is moved in the >>
front >> of the pipes. in the meantime, the value of eligible and active are >>
decremented. therefore the last eligible pipe (which was in front of the >>
de-activated pipe before this event) now becomes in-eligible. and it will >>
not receive any messages after this. >> >> let me know if this makes any sense.
it's hard for me to write a >> standalone >> test case like this. I hope my
explanation is clear. And if you can >> suggest >> any fix, let me know. I can
see one fix is to swap with the last eligible >> pipe and then delete that
position. >> >> Thanks, >> Winston >> >> >> >> ----- Original Message ----- >>
>> From: Pieter Hintjens >> >> Sent: 03/15/13 09:15 AM >> >> To: ZeroMQ
development list >> >> Subject: Re: [zeromq-dev] subscriber stopped receiving
messages from XPUB >> socket >> >> >> >> It sounds like a problem in the
subscription forwarding, yet it's not >> clear how a subscriber could be
affected by the publisher restarting, >> with the proxy in between. >> >> Do
you need the proxy at all? First thing would be to connect >> subscribers
directly to the publisher. If the problem then still >> happens we can try to
make a reproducible test case. >> >> -Pieter >> >> >> >> On Fri, Mar 15, 2013
at 4:59 AM, Winston Huang <[email protected]> wrote: >>> hi there, >>> >>> I
have multiple (5-10) subscribers subscribing to the same topic >>> published
>>> by one publisher. They are connected via a XSUB-XPUB proxy. All the >>>
subscribers are always up and the publisher may come and go at times. I >>>
have >>> noticed that at times, after the publisher is restarted, one of the
>>> subscribers might stop receiving any messages at all. It's not a >>>
slow-joiner >>> kind of issue because the publisher continues to publish
message every >>> second and that subscriber may not get any messages at all
forever, >>> whereas >>> other subscribers are getting messages at the same
time. I also verified >>> that the subscriber is waiting for messages (it's
calling the receive >>> function.) and if I restart the subscriber, it will get
messages again. >>> >>> Could someone enlighten me what I may be doing wrong?
Is there any thing >>> I >>> should be looking into? >>> >>> Thanks in advance.
>>> Winston >>> _______________________________________________ >>> zeromq-dev
mailing list >>> [email protected] >>>
http://lists.zeromq.org/mailman/listinfo/zeromq-dev >>> >>
_______________________________________________ >> zeromq-dev mailing list >>
[email protected] >>
http://lists.zeromq.org/mailman/listinfo/zeromq-dev >> >> >> >> >>
_______________________________________________ >> zeromq-dev mailing list >>
[email protected] >>
http://lists.zeromq.org/mailman/listinfo/zeromq-dev >> >
_______________________________________________ > zeromq-dev mailing list >
[email protected] >
http://lists.zeromq.org/mailman/listinfo/zeromq-dev > > > > >
_______________________________________________ > zeromq-dev mailing list >
[email protected] >
http://lists.zeromq.org/mailman/listinfo/zeromq-dev >
_______________________________________________ zeromq-dev mailing list
[email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev