Re: [osgi-dev] Push Stream reset/flush howto

2018-11-15 Thread Tim Ward via osgi-dev
> So are you proposing something like process an event and decide that you will 
> have to re-query, fails the promise with a “CacheInvalidatedException” and 
> then in the recovery function perform the re-query and then just keep 
> processing events on the stream?

Technically it would be a new PushStream operating on the results of the 
re-query, but yes. The result would then be folded into the Promise chained 
from the the recover() call.

Just to be extra brain-bending, the recovery function may also apply itself to 
the Promise it returns, that way you’ll continuously retry whenever the 
“CacheInvalidatedException" occurs.

For example:

public  Promise recoveryFunction(Promise p) {
Throwable t = p.getFailure();

if(t instanceof CacheInvalidatedException) {
 Promise newValue = reQuery();
 // This will repeatedly retry every time you get a Cache invalidation
 return newValue.recoverWith(this::recoveryFunction);
}
}


Tim

> On 15 Nov 2018, at 12:43, Alain Picard  wrote:
> 
> On Thu, Nov 15, 2018 at 7:25 AM Tim Ward  > wrote:
>>  My expectation is that after the "flush error event", I can again accept 
>> new published event and process those until I get another case where the 
>> cached information is invalidated (i.e. the notification event changes the 
>> result set and there is no way to simply update the cache and we have to 
>> re-run the query).
> 
> So you can’t do this with the same PushStream instance. This is because the 
> Pushstream instance is terminated after a terminal event (which an error 
> event is). This does, however, give you the opportunity to re-run the query.
> Got that, and why I was confused. 
> 
> One way to achieve this is by registering a recovery function with the 
> terminal promise from the PushStream. If the Promise fails with a 
> “CacheInvalidatedException” you can re-run the same PushStream flow again 
> (and again…) until the cache is no longer updated.
> So are you proposing something like process an event and decide that you will 
> have to re-query, fails the promise with a “CacheInvalidatedException” and 
> then in the recovery function perform the re-query and then just keep 
> processing events on the stream?
> 
> Does that make sense?
> 
> Tim
> 
>> On 15 Nov 2018, at 11:22, Alain Picard > > wrote:
>> 
>> Tim,
>> 
>> One thing that I'm unsure about your suggestion. My expectation is that 
>> after the "flush error event", I can again accept new published event and 
>> process those until I get another case where the cached information is 
>> invalidated (i.e. the notification event changes the result set and there is 
>> no way to simply update the cache and we have to re-run the query). I am 
>> unclear as to how this would happen.
>> 
>> 
>> Thanks
>> Alain
>> 
>> 
>> On Thu, Nov 15, 2018 at 5:52 AM Tim Ward > > wrote:
>> The correct option will depend on what you want to happen. If you use an 
>> endOfStream() or close() operation then you are telling your push stream 
>> that the data has reached a “natural end”. This will cause the promise at 
>> the end of your stream to resolve normally. This may be the right thing in 
>> some cases.
>> 
>> In the case that you describe I agree that endOfStream() and close() don’t 
>> seem like the correct approach. The data hasn’t reached a natural 
>> conclusion, and in fact the processing so far has been invalidated. This is 
>> the perfect opportunity to send an error event! You can send an Exception 
>> indicating that the result set was not completely processed, and potentially 
>> has been invalidated. The default behaviour of the push stream will then be 
>> to fail the pipeline, however a terminal “forEachEvent” handler could still 
>> choose to do something useful with the information. For example it might 
>> choose to trigger recreation of the stream against the updated dataset!
>> 
>> I hope this helps,
>> 
>> Tim
>> 
>> > On 15 Nov 2018, at 09:52, Alain Picard via osgi-dev 
>> > mailto:osgi-dev@mail.osgi.org>> wrote:
>> > 
>> > We are using a push stream to process data change notifications against a 
>> > cached result set. Some of those notifications can result in directly 
>> > applying updates to the result set, while other will force us to 
>> > invalidate the cached result set.
>> > 
>> > When we do a requery, we want to make sure that any subsequent event sent 
>> > to the push stream can be cleared and ignore. Looking at endOfStream() or 
>> > close() doesn't seem to be the way to go. Only solution for now is to 
>> > switch to a new stream, but wondering if that is the right way to do it.
>> > 
>> > Regards,
>> > Alain
>> > ___
>> > OSGi Developer Mail List
>> > osgi-dev@mail.osgi.org 
>> > https://mail.osgi.org/mailman/listinfo/osgi-dev 
>> > 
>> 

Re: [osgi-dev] Push Stream reset/flush howto

2018-11-15 Thread Tim Ward via osgi-dev
>  My expectation is that after the "flush error event", I can again accept new 
> published event and process those until I get another case where the cached 
> information is invalidated (i.e. the notification event changes the result 
> set and there is no way to simply update the cache and we have to re-run the 
> query).

So you can’t do this with the same PushStream instance. This is because the 
Pushstream instance is terminated after a terminal event (which an error event 
is). This does, however, give you the opportunity to re-run the query.

One way to achieve this is by registering a recovery function with the terminal 
promise from the PushStream. If the Promise fails with a 
“CacheInvalidatedException” you can re-run the same PushStream flow again (and 
again…) until the cache is no longer updated.

Does that make sense?

Tim

> On 15 Nov 2018, at 11:22, Alain Picard  wrote:
> 
> Tim,
> 
> One thing that I'm unsure about your suggestion. My expectation is that after 
> the "flush error event", I can again accept new published event and process 
> those until I get another case where the cached information is invalidated 
> (i.e. the notification event changes the result set and there is no way to 
> simply update the cache and we have to re-run the query). I am unclear as to 
> how this would happen.
> 
> 
> Thanks
> Alain
> 
> 
> On Thu, Nov 15, 2018 at 5:52 AM Tim Ward  > wrote:
> The correct option will depend on what you want to happen. If you use an 
> endOfStream() or close() operation then you are telling your push stream that 
> the data has reached a “natural end”. This will cause the promise at the end 
> of your stream to resolve normally. This may be the right thing in some cases.
> 
> In the case that you describe I agree that endOfStream() and close() don’t 
> seem like the correct approach. The data hasn’t reached a natural conclusion, 
> and in fact the processing so far has been invalidated. This is the perfect 
> opportunity to send an error event! You can send an Exception indicating that 
> the result set was not completely processed, and potentially has been 
> invalidated. The default behaviour of the push stream will then be to fail 
> the pipeline, however a terminal “forEachEvent” handler could still choose to 
> do something useful with the information. For example it might choose to 
> trigger recreation of the stream against the updated dataset!
> 
> I hope this helps,
> 
> Tim
> 
> > On 15 Nov 2018, at 09:52, Alain Picard via osgi-dev  > > wrote:
> > 
> > We are using a push stream to process data change notifications against a 
> > cached result set. Some of those notifications can result in directly 
> > applying updates to the result set, while other will force us to invalidate 
> > the cached result set.
> > 
> > When we do a requery, we want to make sure that any subsequent event sent 
> > to the push stream can be cleared and ignore. Looking at endOfStream() or 
> > close() doesn't seem to be the way to go. Only solution for now is to 
> > switch to a new stream, but wondering if that is the right way to do it.
> > 
> > Regards,
> > Alain
> > ___
> > OSGi Developer Mail List
> > osgi-dev@mail.osgi.org 
> > https://mail.osgi.org/mailman/listinfo/osgi-dev 
> > 
> 

___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Re: [osgi-dev] Push Stream reset/flush howto

2018-11-15 Thread Tim Ward via osgi-dev
The correct option will depend on what you want to happen. If you use an 
endOfStream() or close() operation then you are telling your push stream that 
the data has reached a “natural end”. This will cause the promise at the end of 
your stream to resolve normally. This may be the right thing in some cases.

In the case that you describe I agree that endOfStream() and close() don’t seem 
like the correct approach. The data hasn’t reached a natural conclusion, and in 
fact the processing so far has been invalidated. This is the perfect 
opportunity to send an error event! You can send an Exception indicating that 
the result set was not completely processed, and potentially has been 
invalidated. The default behaviour of the push stream will then be to fail the 
pipeline, however a terminal “forEachEvent” handler could still choose to do 
something useful with the information. For example it might choose to trigger 
recreation of the stream against the updated dataset!

I hope this helps,

Tim

> On 15 Nov 2018, at 09:52, Alain Picard via osgi-dev  
> wrote:
> 
> We are using a push stream to process data change notifications against a 
> cached result set. Some of those notifications can result in directly 
> applying updates to the result set, while other will force us to invalidate 
> the cached result set.
> 
> When we do a requery, we want to make sure that any subsequent event sent to 
> the push stream can be cleared and ignore. Looking at endOfStream() or 
> close() doesn't seem to be the way to go. Only solution for now is to switch 
> to a new stream, but wondering if that is the right way to do it.
> 
> Regards,
> Alain
> ___
> OSGi Developer Mail List
> osgi-dev@mail.osgi.org
> https://mail.osgi.org/mailman/listinfo/osgi-dev

___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

[osgi-dev] Push Stream reset/flush howto

2018-11-15 Thread Alain Picard via osgi-dev
 We are using a push stream to process data change notifications against a
cached result set. Some of those notifications can result in directly
applying updates to the result set, while other will force us to invalidate
the cached result set.

When we do a requery, we want to make sure that any subsequent event sent
to the push stream can be cleared and ignore. Looking at endOfStream() or
close() doesn't seem to be the way to go. Only solution for now is to
switch to a new stream, but wondering if that is the right way to do it.

Regards,
Alain
___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev