Thanks Tim. We'll go with that for now as we are matching the default
behavior and will just add a note documenting to be careful if we do change
that.

Alain


On Thu, Nov 29, 2018 at 6:23 AM Tim Ward <tim.w...@paremus.com> wrote:

> That looks reasonable to me. You could also do it using an error handler
> on the promise returned by forEach(), but what you’re doing is also fine.
>
> Depending on your threading model you may lose some events, specifically
> if there’s a buffer in the SimplePushEventSource that you are using (which
> there is unless you’ve worked to avoid it) with a parallelism greater than
> 1 (the default is one) then some events may be delivered into the failed
> push stream by other threads (if they are available) and discarded. If
> there is only one worker thread, or a parallelism of one, then this won’t
> happen and as your “onError” connects a new listener *before* the old one
> returns its back pressure to the SimplePushEventSource.
>
> Note that this whole reasoning goes out of the window if you add another
> buffer between the SimplePushEventSource and your error handler.
>
> I hope this helps,
>
> Tim
>
> > On 28 Nov 2018, at 23:32, Alain Picard via osgi-dev <
> osgi-dev@mail.osgi.org> wrote:
> >
> > We ended up with an exception thrown in the forEach of our stream, which
> is a stream to manage notifications and that should be always on. Nothing
> got reported, but the stream stopped working. Finally testing isConnected
> reported false and then found the source of the exception.
> >
> > Now digging, we found out that there are onError and onClose, but
> couldn't find any example. First time tried to insert after the
> createStream but debugging found that this seemed to be tied to the filter
> intermediate stream. Now moving the onClose, is working but very unsure if
> the pattern is correct.
> >
> > private void handleEvents() {
> >    psp.createStream(spes) //NOSONAR as we don't close it
> >       .filter(isOfInterest())
> >       .onError(e -> {
> >           log.error(Messages.CNCI_0, e);
> >           handleEvents(); // start a new stream
> >       })
> >       .forEach(entry -> entry.listener.notify(entry.notification)
> >    );
> >  }
> >
> > Thanks
> > Alain
> > _______________________________________________
> > OSGi Developer Mail List
> > osgi-dev@mail.osgi.org
> > https://mail.osgi.org/mailman/listinfo/osgi-dev
>
>
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to