Re: [osgi-dev] Removing queued events in Push Steams

2019-03-16 Thread Tim Ward via osgi-dev
Hi - I’m afraid that I’m out on vacation this week, so I can’t put quite as 
much into my reply as I normally would. 

It sounds like you have a PushStream pipeline with multiple buffered stages. 
This isn’t a problem, and can be a good thing, but as you’re noticing it is 
leading to the following behaviour:

• The event gets published to the PushStream
• The event is queued into the initial buffer
• The event is dequeued from the buffer by a worker thread which starts pushing 
the event through the pipeline
• The event hits the window stage, which puts the event into the “window” and 
returns zero back pressure. 
• The worker thread returns and is immediately ready to process more events 
from the buffer

At some point later the window ends

• The PushStream worker thread is given the job of pushing the window through 
the rest of the pipeline.

Effectively the reason that your Queue Policy isn’t seeing any events is that 
there is no queue - the stages up to the window are able to easily keep pace 
with the event arrival rate, probably because the pipeline is short and they 
have nothing to do. 

In this situation you therefore need to look elsewhere for your control because 
it’s not a queueing problem. 

Probably the correct outcome would be to have a stream set up like so:

———

Stage 1: “split” the PushStream into two, one for high priority events (we’ll 
call this stream A), one for other events (we’ll call this Stream B). 

Stage 2 (A): wrap the event in a singleton list so that it’s type compatible 
with merging later

Stage 2 (B): window the events as you were doing before

Stage 3: Merge streams A and B back into each other. This will give you a 
stream of lists of events where high priority events bypass the window

Stage 4+: add a filter and/or mapping stage to remove low priority events that 
have been invalidated by the high priority event. 

———

I say that this will “probably” work for you because there may be other things 
that you want to do in the high priority event case. 

Certainly worth a try :)

Tim

Sent from my iPhone

> On 15 Mar 2019, at 19:20, Willy Montes  
> wrote:
> 
> Hello,
> 
> We tried using the QueuePolicy to accomplish our priority/merge management 
> but it didn't work.
> 
> Our main goal is to process the Diagram update events in background to avoid 
> slowing down other processes. We set up a first pushStream were we publish 
> the low priority events and we handle them using the PushStream#window 
> consumer to process them in bulk.
> 
> Every time a high priority Diagram Event comes, we want to remove all low 
> priority events for the same Diagram from the first stream, merge them, and 
> process them right away. We wanted to do that with a second pushStream 
> dedicated for handling high priority events.
> 
> Why 2 pushStreams, because each one is configured with a different executor, 
> particularly the one handling low priority events will use daemons, low 
> priority threads, while the other won't.
> 
> The QueuePolicy didn't work, because every time the QueuePolicy#doOffer 
> method is called, the queue passed as parameter was empty. Using custom 
> queues, we were able to see that right after an event was queued, it was 
> polled and put in some other buffer. In that way, when the next event 
> arrived, the queue was again empty. This won't allow any logic in the policy 
> to modify the order/priority of the events.
> 
> We tried using custom PriorityQueues as buffers for the pushStream, but the 
> same issue happens.
>  
> Having an additional parallel structures referencing the objects published in 
> the streams beats the purpose for us.
> 
> Not sure if there is any caveat around the PushStream#window consumer. We are 
> still wondering if we are on the right track by using streams for this. 
> 
> Anyone has any insight here?
> 
> Thanks,
> 
> Willy Montes.
> 
>> On Wed, Feb 27, 2019 at 9:55 AM Alain Picard  wrote:
>> Willy,
>> 
>> Re-asked and got some answers this morning. IMHO the Pushstream QueuePolicy 
>> looks the most promising. It seems to fit right in. You would then need only 
>> 1 push steam and not 2 and just control the queuing "offer" 
>> 
>> Alain
>> 
>> pic...@castortech.com
>> www.castortech.com
>> 
>> Forwarded Conversation
>> Subject: Removing queued events in Push Steams
>> 
>> 
>> From: Alain Picard 
>> Date: Tue, Feb 5, 2019 at 1:28 PM
>> To: OSGi Developer Mail List 
>> 
>> 
>> Hi,
>> 
>> We have cases where we need to process events with different priorities, and 
>> such priority can change after the initial event having been queued, but not 
>> yet processed.
>> 
>> For example, when there is an event that some content has changed, we 
>> subscribe to this event and based on some conditions this might trigger the 
>> need to update some diagrams in our case. This is considered a "background 
>> priority" event, since we simply want to get it updated when we have some 
>> cycles so as not being stuck 

Re: [osgi-dev] Removing queued events in Push Steams

2019-02-27 Thread Bernd Eckenfels via osgi-dev
Für the concrete scenario at hand you could have the low prio Update events 
carry a If-older-than field so you would not dequeue them but let the processor 
discard them. Much easier than have concurrent updates (however only efficient 
for certain traffic patterns)

Gruss
Bernd
--
http://bernd.eckenfels.net


Von: osgi-dev-boun...@mail.osgi.org im Auftrag von Alain Picard via osgi-dev 

Gesendet: Mittwoch, Februar 27, 2019 12:50 PM
An: OSGi Developer Mail List
Betreff: Re: [osgi-dev] Removing queued events in Push Steams

Anyone has any insight here?
Alain

On Tue, Feb 5, 2019 at 1:28 PM Alain Picard 
mailto:pic...@castortech.com>> wrote:
Hi,

We have cases where we need to process events with different priorities, and 
such priority can change after the initial event having been queued, but not 
yet processed.

For example, when there is an event that some content has changed, we subscribe 
to this event and based on some conditions this might trigger the need to 
update some diagrams in our case. This is considered a "background priority" 
event, since we simply want to get it updated when we have some cycles so as 
not being stuck doing it whenever someone requests such diagram to view/edit it.

We also have events when someone for example requests to open such a diagram, 
where we need to determine if it is up to date, and if it needs to be updated, 
to get this pushed and processed as quickly as possible, as the user is waiting.

So far we have setup 2 different push streams to support this.

The issue here is that while this is high-priority event comes in, we need to 
make sure that we can cancel any similar queued events from the low priority 
stream, and possibly let it proceed if it is already being processed.

What is the best approach here ? Are we on the right track to start with?

Thanks
Alain

___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Re: [osgi-dev] Removing queued events in Push Steams

2019-02-27 Thread Tim Ward via osgi-dev
Another option would be for you to take control of the queuing using a 
QueuePolicy. That would enable you to insert work at the head of the buffer (or 
at least higher up) if it was more important. You could also remove some of the 
entries if they are invalidated by the higher priority insert.

I await the results of your research with interest :)

Best Regards,

Tim

> On 27 Feb 2019, at 12:45, Peter Kriens via osgi-dev  
> wrote:
> 
> I probably would use a (static?) priority set with a weak reference to the 
> event object. (Or some key that uniquely identifies that object). The 
> processor can then consult this set to see if the event has higher priority. 
> A weak reference is needed to make sure that no events remain in this 
> priority set without locking.  
> 
> PK
> 
>> On 27 Feb 2019, at 12:49, Alain Picard via osgi-dev > > wrote:
>> 
>> Anyone has any insight here?
>> Alain
>> 
>> On Tue, Feb 5, 2019 at 1:28 PM Alain Picard > > wrote:
>> Hi,
>> 
>> We have cases where we need to process events with different priorities, and 
>> such priority can change after the initial event having been queued, but not 
>> yet processed.
>> 
>> For example, when there is an event that some content has changed, we 
>> subscribe to this event and based on some conditions this might trigger the 
>> need to update some diagrams in our case. This is considered a "background 
>> priority" event, since we simply want to get it updated when we have some 
>> cycles so as not being stuck doing it whenever someone requests such diagram 
>> to view/edit it.
>> 
>> We also have events when someone for example requests to open such a 
>> diagram, where we need to determine if it is up to date, and if it needs to 
>> be updated, to get this pushed and processed as quickly as possible, as the 
>> user is waiting.
>> 
>> So far we have setup 2 different push streams to support this. 
>> 
>> The issue here is that while this is high-priority event comes in, we need 
>> to make sure that we can cancel any similar queued events from the low 
>> priority stream, and possibly let it proceed if it is already being 
>> processed.
>> 
>> What is the best approach here ? Are we on the right track to start with?
>> 
>> Thanks
>> Alain
>> 
>> ___
>> OSGi Developer Mail List
>> osgi-dev@mail.osgi.org 
>> https://mail.osgi.org/mailman/listinfo/osgi-dev
> 
> ___
> OSGi Developer Mail List
> osgi-dev@mail.osgi.org
> https://mail.osgi.org/mailman/listinfo/osgi-dev

___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Re: [osgi-dev] Removing queued events in Push Steams

2019-02-27 Thread Peter Kriens via osgi-dev
I probably would use a (static?) priority set with a weak reference to the 
event object. (Or some key that uniquely identifies that object). The processor 
can then consult this set to see if the event has higher priority. A weak 
reference is needed to make sure that no events remain in this priority set 
without locking.  

PK

> On 27 Feb 2019, at 12:49, Alain Picard via osgi-dev  
> wrote:
> 
> Anyone has any insight here?
> Alain
> 
> On Tue, Feb 5, 2019 at 1:28 PM Alain Picard  > wrote:
> Hi,
> 
> We have cases where we need to process events with different priorities, and 
> such priority can change after the initial event having been queued, but not 
> yet processed.
> 
> For example, when there is an event that some content has changed, we 
> subscribe to this event and based on some conditions this might trigger the 
> need to update some diagrams in our case. This is considered a "background 
> priority" event, since we simply want to get it updated when we have some 
> cycles so as not being stuck doing it whenever someone requests such diagram 
> to view/edit it.
> 
> We also have events when someone for example requests to open such a diagram, 
> where we need to determine if it is up to date, and if it needs to be 
> updated, to get this pushed and processed as quickly as possible, as the user 
> is waiting.
> 
> So far we have setup 2 different push streams to support this. 
> 
> The issue here is that while this is high-priority event comes in, we need to 
> make sure that we can cancel any similar queued events from the low priority 
> stream, and possibly let it proceed if it is already being processed.
> 
> What is the best approach here ? Are we on the right track to start with?
> 
> Thanks
> Alain
> 
> ___
> OSGi Developer Mail List
> osgi-dev@mail.osgi.org
> https://mail.osgi.org/mailman/listinfo/osgi-dev

___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Re: [osgi-dev] Removing queued events in Push Steams

2019-02-27 Thread Alain Picard via osgi-dev
Anyone has any insight here?
Alain

On Tue, Feb 5, 2019 at 1:28 PM Alain Picard  wrote:

> Hi,
>
> We have cases where we need to process events with different priorities,
> and such priority can change after the initial event having been queued,
> but not yet processed.
>
> For example, when there is an event that some content has changed, we
> subscribe to this event and based on some conditions this might trigger the
> need to update some diagrams in our case. This is considered a "background
> priority" event, since we simply want to get it updated when we have some
> cycles so as not being stuck doing it whenever someone requests such
> diagram to view/edit it.
>
> We also have events when someone for example requests to open such a
> diagram, where we need to determine if it is up to date, and if it needs to
> be updated, to get this pushed and processed as quickly as possible, as the
> user is waiting.
>
> So far we have setup 2 different push streams to support this.
>
> The issue here is that while this is high-priority event comes in, we need
> to make sure that we can cancel any similar queued events from the low
> priority stream, and possibly let it proceed if it is already being
> processed.
>
> What is the best approach here ? Are we on the right track to start with?
>
> Thanks
> Alain
>
>
___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

[osgi-dev] Removing queued events in Push Steams

2019-02-05 Thread Alain Picard via osgi-dev
Hi,

We have cases where we need to process events with different priorities,
and such priority can change after the initial event having been queued,
but not yet processed.

For example, when there is an event that some content has changed, we
subscribe to this event and based on some conditions this might trigger the
need to update some diagrams in our case. This is considered a "background
priority" event, since we simply want to get it updated when we have some
cycles so as not being stuck doing it whenever someone requests such
diagram to view/edit it.

We also have events when someone for example requests to open such a
diagram, where we need to determine if it is up to date, and if it needs to
be updated, to get this pushed and processed as quickly as possible, as the
user is waiting.

So far we have setup 2 different push streams to support this.

The issue here is that while this is high-priority event comes in, we need
to make sure that we can cancel any similar queued events from the low
priority stream, and possibly let it proceed if it is already being
processed.

What is the best approach here ? Are we on the right track to start with?

Thanks
Alain
___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev