Thanks Tim. We'll go with that for now as we are matching the default behavior and will just add a note documenting to be careful if we do change that.
Alain On Thu, Nov 29, 2018 at 6:23 AM Tim Ward <tim.w...@paremus.com> wrote: > That looks reasonable to me. You could also do it using an error handler > on the promise returned by forEach(), but what you’re doing is also fine. > > Depending on your threading model you may lose some events, specifically > if there’s a buffer in the SimplePushEventSource that you are using (which > there is unless you’ve worked to avoid it) with a parallelism greater than > 1 (the default is one) then some events may be delivered into the failed > push stream by other threads (if they are available) and discarded. If > there is only one worker thread, or a parallelism of one, then this won’t > happen and as your “onError” connects a new listener *before* the old one > returns its back pressure to the SimplePushEventSource. > > Note that this whole reasoning goes out of the window if you add another > buffer between the SimplePushEventSource and your error handler. > > I hope this helps, > > Tim > > > On 28 Nov 2018, at 23:32, Alain Picard via osgi-dev < > osgi-dev@mail.osgi.org> wrote: > > > > We ended up with an exception thrown in the forEach of our stream, which > is a stream to manage notifications and that should be always on. Nothing > got reported, but the stream stopped working. Finally testing isConnected > reported false and then found the source of the exception. > > > > Now digging, we found out that there are onError and onClose, but > couldn't find any example. First time tried to insert after the > createStream but debugging found that this seemed to be tied to the filter > intermediate stream. Now moving the onClose, is working but very unsure if > the pattern is correct. > > > > private void handleEvents() { > > psp.createStream(spes) //NOSONAR as we don't close it > > .filter(isOfInterest()) > > .onError(e -> { > > log.error(Messages.CNCI_0, e); > > handleEvents(); // start a new stream > > }) > > .forEach(entry -> entry.listener.notify(entry.notification) > > ); > > } > > > > Thanks > > Alain > > _______________________________________________ > > OSGi Developer Mail List > > osgi-dev@mail.osgi.org > > https://mail.osgi.org/mailman/listinfo/osgi-dev > >
_______________________________________________ OSGi Developer Mail List osgi-dev@mail.osgi.org https://mail.osgi.org/mailman/listinfo/osgi-dev