> So are you proposing something like process an event and decide that you will
> have to re-query, fails the promise with a “CacheInvalidatedException” and
> then in the recovery function perform the re-query and then just keep
> processing events on the stream?
Technically it would be a new PushStream operating on the results of the
re-query, but yes. The result would then be folded into the Promise chained
from the the recover() call.
Just to be extra brain-bending, the recovery function may also apply itself to
the Promise it returns, that way you’ll continuously retry whenever the
“CacheInvalidatedException" occurs.
For example:
public <T> Promise<T> recoveryFunction(Promise<?> p) {
Throwable t = p.getFailure();
if(t instanceof CacheInvalidatedException) {
Promise<T> newValue = reQuery();
// This will repeatedly retry every time you get a Cache invalidation
return newValue.recoverWith(this::recoveryFunction);
}
}
Tim
> On 15 Nov 2018, at 12:43, Alain Picard <[email protected]> wrote:
>
> On Thu, Nov 15, 2018 at 7:25 AM Tim Ward <[email protected]
> <mailto:[email protected]>> wrote:
>> My expectation is that after the "flush error event", I can again accept
>> new published event and process those until I get another case where the
>> cached information is invalidated (i.e. the notification event changes the
>> result set and there is no way to simply update the cache and we have to
>> re-run the query).
>
> So you can’t do this with the same PushStream instance. This is because the
> Pushstream instance is terminated after a terminal event (which an error
> event is). This does, however, give you the opportunity to re-run the query.
> Got that, and why I was confused.
>
> One way to achieve this is by registering a recovery function with the
> terminal promise from the PushStream. If the Promise fails with a
> “CacheInvalidatedException” you can re-run the same PushStream flow again
> (and again…) until the cache is no longer updated.
> So are you proposing something like process an event and decide that you will
> have to re-query, fails the promise with a “CacheInvalidatedException” and
> then in the recovery function perform the re-query and then just keep
> processing events on the stream?
>
> Does that make sense?
>
> Tim
>
>> On 15 Nov 2018, at 11:22, Alain Picard <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Tim,
>>
>> One thing that I'm unsure about your suggestion. My expectation is that
>> after the "flush error event", I can again accept new published event and
>> process those until I get another case where the cached information is
>> invalidated (i.e. the notification event changes the result set and there is
>> no way to simply update the cache and we have to re-run the query). I am
>> unclear as to how this would happen.
>>
>>
>> Thanks
>> Alain
>>
>>
>> On Thu, Nov 15, 2018 at 5:52 AM Tim Ward <[email protected]
>> <mailto:[email protected]>> wrote:
>> The correct option will depend on what you want to happen. If you use an
>> endOfStream() or close() operation then you are telling your push stream
>> that the data has reached a “natural end”. This will cause the promise at
>> the end of your stream to resolve normally. This may be the right thing in
>> some cases.
>>
>> In the case that you describe I agree that endOfStream() and close() don’t
>> seem like the correct approach. The data hasn’t reached a natural
>> conclusion, and in fact the processing so far has been invalidated. This is
>> the perfect opportunity to send an error event! You can send an Exception
>> indicating that the result set was not completely processed, and potentially
>> has been invalidated. The default behaviour of the push stream will then be
>> to fail the pipeline, however a terminal “forEachEvent” handler could still
>> choose to do something useful with the information. For example it might
>> choose to trigger recreation of the stream against the updated dataset!
>>
>> I hope this helps,
>>
>> Tim
>>
>> > On 15 Nov 2018, at 09:52, Alain Picard via osgi-dev
>> > <[email protected] <mailto:[email protected]>> wrote:
>> >
>> > We are using a push stream to process data change notifications against a
>> > cached result set. Some of those notifications can result in directly
>> > applying updates to the result set, while other will force us to
>> > invalidate the cached result set.
>> >
>> > When we do a requery, we want to make sure that any subsequent event sent
>> > to the push stream can be cleared and ignore. Looking at endOfStream() or
>> > close() doesn't seem to be the way to go. Only solution for now is to
>> > switch to a new stream, but wondering if that is the right way to do it.
>> >
>> > Regards,
>> > Alain
>> > _______________________________________________
>> > OSGi Developer Mail List
>> > [email protected] <mailto:[email protected]>
>> > https://mail.osgi.org/mailman/listinfo/osgi-dev
>> > <https://mail.osgi.org/mailman/listinfo/osgi-dev>
>>
>
_______________________________________________
OSGi Developer Mail List
[email protected]
https://mail.osgi.org/mailman/listinfo/osgi-dev