> So are you proposing something like process an event and decide that you will 
> have to re-query, fails the promise with a “CacheInvalidatedException” and 
> then in the recovery function perform the re-query and then just keep 
> processing events on the stream?

Technically it would be a new PushStream operating on the results of the 
re-query, but yes. The result would then be folded into the Promise chained 
from the the recover() call.

Just to be extra brain-bending, the recovery function may also apply itself to 
the Promise it returns, that way you’ll continuously retry whenever the 
“CacheInvalidatedException" occurs.

For example:

public <T> Promise<T> recoveryFunction(Promise<?> p) {
    Throwable t = p.getFailure();

    if(t instanceof CacheInvalidatedException) {
         Promise<T> newValue = reQuery();
         // This will repeatedly retry every time you get a Cache invalidation
         return newValue.recoverWith(this::recoveryFunction);
    }
}


Tim

> On 15 Nov 2018, at 12:43, Alain Picard <[email protected]> wrote:
> 
> On Thu, Nov 15, 2018 at 7:25 AM Tim Ward <[email protected] 
> <mailto:[email protected]>> wrote:
>>  My expectation is that after the "flush error event", I can again accept 
>> new published event and process those until I get another case where the 
>> cached information is invalidated (i.e. the notification event changes the 
>> result set and there is no way to simply update the cache and we have to 
>> re-run the query).
> 
> So you can’t do this with the same PushStream instance. This is because the 
> Pushstream instance is terminated after a terminal event (which an error 
> event is). This does, however, give you the opportunity to re-run the query.
> Got that, and why I was confused. 
> 
> One way to achieve this is by registering a recovery function with the 
> terminal promise from the PushStream. If the Promise fails with a 
> “CacheInvalidatedException” you can re-run the same PushStream flow again 
> (and again…) until the cache is no longer updated.
> So are you proposing something like process an event and decide that you will 
> have to re-query, fails the promise with a “CacheInvalidatedException” and 
> then in the recovery function perform the re-query and then just keep 
> processing events on the stream?
> 
> Does that make sense?
> 
> Tim
> 
>> On 15 Nov 2018, at 11:22, Alain Picard <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Tim,
>> 
>> One thing that I'm unsure about your suggestion. My expectation is that 
>> after the "flush error event", I can again accept new published event and 
>> process those until I get another case where the cached information is 
>> invalidated (i.e. the notification event changes the result set and there is 
>> no way to simply update the cache and we have to re-run the query). I am 
>> unclear as to how this would happen.
>> 
>> 
>> Thanks
>> Alain
>> 
>> 
>> On Thu, Nov 15, 2018 at 5:52 AM Tim Ward <[email protected] 
>> <mailto:[email protected]>> wrote:
>> The correct option will depend on what you want to happen. If you use an 
>> endOfStream() or close() operation then you are telling your push stream 
>> that the data has reached a “natural end”. This will cause the promise at 
>> the end of your stream to resolve normally. This may be the right thing in 
>> some cases.
>> 
>> In the case that you describe I agree that endOfStream() and close() don’t 
>> seem like the correct approach. The data hasn’t reached a natural 
>> conclusion, and in fact the processing so far has been invalidated. This is 
>> the perfect opportunity to send an error event! You can send an Exception 
>> indicating that the result set was not completely processed, and potentially 
>> has been invalidated. The default behaviour of the push stream will then be 
>> to fail the pipeline, however a terminal “forEachEvent” handler could still 
>> choose to do something useful with the information. For example it might 
>> choose to trigger recreation of the stream against the updated dataset!
>> 
>> I hope this helps,
>> 
>> Tim
>> 
>> > On 15 Nov 2018, at 09:52, Alain Picard via osgi-dev 
>> > <[email protected] <mailto:[email protected]>> wrote:
>> > 
>> > We are using a push stream to process data change notifications against a 
>> > cached result set. Some of those notifications can result in directly 
>> > applying updates to the result set, while other will force us to 
>> > invalidate the cached result set.
>> > 
>> > When we do a requery, we want to make sure that any subsequent event sent 
>> > to the push stream can be cleared and ignore. Looking at endOfStream() or 
>> > close() doesn't seem to be the way to go. Only solution for now is to 
>> > switch to a new stream, but wondering if that is the right way to do it.
>> > 
>> > Regards,
>> > Alain
>> > _______________________________________________
>> > OSGi Developer Mail List
>> > [email protected] <mailto:[email protected]>
>> > https://mail.osgi.org/mailman/listinfo/osgi-dev 
>> > <https://mail.osgi.org/mailman/listinfo/osgi-dev>
>> 
> 

_______________________________________________
OSGi Developer Mail List
[email protected]
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to