Re: [osgi-dev] Push Streams Event grouping and batching

2019-02-06 Thread Tim Ward via osgi-dev
So having dug further, the following is sufficient:

Promise>> result = psp.buildStream(sps)
.withExecutor(Executors.newFixedThreadPool(2))
.build()
.asyncMap(2, 0, new StreamDebouncer(
new 
PromiseFactory(PromiseFactory.inlineExecutor()), 
10, Duration.ofSeconds(1)))
.filter(c -> !c.isEmpty())
.collect(Collectors.toList());

Note the “withExecutor” call. The default executor has only one thread (because 
the default parallelism is 1), this is used to send events down the pipe. In 
this case the close event is sent and reaches the asyncMap operation. The close 
event then waits for the promises from the ongoing asyncMaps to complete.

In a separate thread the final debounce window is closed because it times out. 
This triggers the gathering of the batch of data and tries to send it on to the 
rest of the stream using the sending thread which is blocked trying to send the 
close event! This deadlocks because the data cannot be sent until the sender 
thread is available, and the sender thread is waiting for the data event to 
have been sent before continuing.

The simple fix is to provide a second thread in the executor. This does not 
result in events arriving out of order because the parallelism of the stream is 
still one. This deadlock is also only triggered by terminal events, which are 
the only kind that can block a sender thread, so no data is harmed. Also, any 
number of sender threads greater than one resolves the problem regardless of 
the applied parallelism.

I hope this helps,

Tim

> On 6 Feb 2019, at 14:57, Tim Ward  wrote:
> 
> Hi Alain,
> 
> Having applied some step by step debugging the issue I can see is actually a 
> deadlock in the async worker pool - we call asyncMap with a parallelism of 2, 
> but we use the default executor from the push stream, which has only one 
> thread.
> 
> If the timing is wrong then we end up with the final debounce window timing 
> out and trying to complete while the one pushing thread is blocking waiting 
> for the timeout to be sent.
> 
> I’ll dig a little further, but the probable answer is just that one of the 
> thread pools needs an extra thread (or possibly that the inline executor 
> needs to be changed).
> 
> Best Regards,
> 
> Tim
> 
>> On 6 Feb 2019, at 14:36, Alain Picard > > wrote:
>> 
>> Jurgen,
>> 
>> Thanks for the clarifications. I also did about the same change to match our 
>> standard stream implementation, and didn't get the issue, but I was too much 
>> in the dark to make any meaningful conclusions.
>> 
>> I was looking to modify the test to verify various scenarios. So I guess 
>> I'll make those changes and proceed from there.
>> 
>> Thanks
>> Alain
>> 
>> 
>> On Wed, Feb 6, 2019 at 9:21 AM Jürgen Albert > > wrote:
>> Creating the PushStream with an exclusive Executor fixes the Problem.
>> 
>> Promise>> result = psp.buildStream(sps)
>> .withBuffer(new ArrayBlockingQueue> Integer>>(32))
>> .withExecutor(Executors.newCachedThreadPool()).build()
>> .asyncMap(5, 0, new StreamDebouncer(
>> new PromiseFactory(PromiseFactory.inlineExecutor()), 
>> 10, Duration.ofSeconds(1)))
>> .filter(c -> !c.isEmpty())
>> .collect(Collectors.toList());
>> 
>> Am 06/02/2019 um 15:14 schrieb Jürgen Albert:
>>> Hi Alain,
>>> 
>>> the issue has a couple of reasons:
>>> 
>>> The Pushstream and eventsource have by default 2 queues with a size of 32 
>>> each. The default pushback policy is linear. This means, that when 10 
>>> events are in the buffer a pushback of 10 ms will be given to the 
>>> eventsource. This means, that the eventsource will wait this time, before 
>>> it sends the next event downstream. This default behaviour can cause long 
>>> processing times, especially when a lot of events are written in a for 
>>> loop. This means that the queues fill up very quick even if the actual 
>>> processing time of the code of yours is close to 0. Use e.g. the 
>>> ON_FULL_FIXED policy to get ride of this problem.
>>> 
>>> As far as I understand the bouncer, it waits a second, before it returns a 
>>> list, if no new events are coming. Thus a sleep time of less then a second 
>>> or even 1,5 (together with the pushback thing I described before) will keep 
>>> the stream busy longer then your sleep time for a batch. Thus all the 
>>> batches return when hitting the max size, except the last one. This waits 
>>> and for some threading reasons, the last deferred is blocked from 
>>> resolving, which in turn blocks eventsource close. If you add a small wait 
>>> before the close is called everything is fine. 

Re: [osgi-dev] Push Streams Event grouping and batching

2019-02-06 Thread Tim Ward via osgi-dev
Hi Alain,

Having applied some step by step debugging the issue I can see is actually a 
deadlock in the async worker pool - we call asyncMap with a parallelism of 2, 
but we use the default executor from the push stream, which has only one thread.

If the timing is wrong then we end up with the final debounce window timing out 
and trying to complete while the one pushing thread is blocking waiting for the 
timeout to be sent.

I’ll dig a little further, but the probable answer is just that one of the 
thread pools needs an extra thread (or possibly that the inline executor needs 
to be changed).

Best Regards,

Tim

> On 6 Feb 2019, at 14:36, Alain Picard  wrote:
> 
> Jurgen,
> 
> Thanks for the clarifications. I also did about the same change to match our 
> standard stream implementation, and didn't get the issue, but I was too much 
> in the dark to make any meaningful conclusions.
> 
> I was looking to modify the test to verify various scenarios. So I guess I'll 
> make those changes and proceed from there.
> 
> Thanks
> Alain
> 
> 
> On Wed, Feb 6, 2019 at 9:21 AM Jürgen Albert  > wrote:
> Creating the PushStream with an exclusive Executor fixes the Problem.
> 
> Promise>> result = psp.buildStream(sps)
> .withBuffer(new ArrayBlockingQueue Integer>>(32))
> .withExecutor(Executors.newCachedThreadPool()).build()
> .asyncMap(5, 0, new StreamDebouncer(
> new PromiseFactory(PromiseFactory.inlineExecutor()), 
> 10, Duration.ofSeconds(1)))
> .filter(c -> !c.isEmpty())
> .collect(Collectors.toList());
> 
> Am 06/02/2019 um 15:14 schrieb Jürgen Albert:
>> Hi Alain,
>> 
>> the issue has a couple of reasons:
>> 
>> The Pushstream and eventsource have by default 2 queues with a size of 32 
>> each. The default pushback policy is linear. This means, that when 10 events 
>> are in the buffer a pushback of 10 ms will be given to the eventsource. This 
>> means, that the eventsource will wait this time, before it sends the next 
>> event downstream. This default behaviour can cause long processing times, 
>> especially when a lot of events are written in a for loop. This means that 
>> the queues fill up very quick even if the actual processing time of the code 
>> of yours is close to 0. Use e.g. the ON_FULL_FIXED policy to get ride of 
>> this problem.
>> 
>> As far as I understand the bouncer, it waits a second, before it returns a 
>> list, if no new events are coming. Thus a sleep time of less then a second 
>> or even 1,5 (together with the pushback thing I described before) will keep 
>> the stream busy longer then your sleep time for a batch. Thus all the 
>> batches return when hitting the max size, except the last one. This waits 
>> and for some threading reasons, the last deferred is blocked from resolving, 
>> which in turn blocks eventsource close. If you add a small wait before the 
>> close is called everything is fine. 
>> 
>> The blocking issue is interesting non the less, but my experience is that 
>> these kind of tests are often harsher then reality.
>> 
>> Regards, 
>> 
>> Jürgen.
>> 
>> Am 05/02/2019 um 23:58 schrieb Alain Picard:
>>> Tim,
>>> 
>>> Finally got around to this debouncer, and I tested to change the sleep 
>>> time. When I set it to like 800 to 1500, it never completes after shoing 
>>> "Closing the Generator". At 500, I get a Queue full that I can understand. 
>>> So why the hang?
>>> 
>>> Alain
>>> 
>>> 
>>> 
>>> On Mon, Jan 7, 2019 at 8:11 AM Tim Ward >> > wrote:
>>> This use case is effectively a “debouncing” behaviour, which is possible to 
>>> implement with a little thought.
>>> 
>>> There are a couple of ways to attempt it. This one uses the asyncMap 
>>> operation to asynchronously gather the events until it either times out the 
>>> promise or it hits the maximum stream size. Note that you have to filter 
>>> out the “empty” lists that are used to resolve the promises which are being 
>>> aggregated into the window. The result of this is a window which starts on 
>>> the first event arrival and then buffers the events for a while. The next 
>>> window isn’t started until the next event
>>> 
>>> 
>>> Best Regards,
>>> 
>>> Tim
>>> 
>>> 
>>> @Test
>>> public void testWindow2() throws InvocationTargetException, 
>>> InterruptedException {
>>> 
>>> PushStreamProvider psp = new PushStreamProvider();
>>> 
>>> SimplePushEventSource sps = 
>>> psp.createSimpleEventSource(Integer.class);
>>> 
>>> Promise>> result = 
>>> psp.createStream(sps)
>>> .asyncMap(2, 0, new StreamDebouncer(
>>> new 
>>> PromiseFactory(PromiseFactory.inlineExecutor()), 
>>> 10, Duration.ofSeconds(1)))
>>> .filter(c -> 

Re: [osgi-dev] Push Streams Event grouping and batching

2019-02-06 Thread Alain Picard via osgi-dev
Jurgen,

Thanks for the clarifications. I also did about the same change to match
our standard stream implementation, and didn't get the issue, but I was too
much in the dark to make any meaningful conclusions.

I was looking to modify the test to verify various scenarios. So I guess
I'll make those changes and proceed from there.

Thanks
Alain


On Wed, Feb 6, 2019 at 9:21 AM Jürgen Albert 
wrote:

> Creating the PushStream with an exclusive Executor fixes the Problem.
>
> Promise>> result = psp.buildStream(sps)
> .withBuffer(new ArrayBlockingQueue Integer>>(32))
> .withExecutor(Executors.newCachedThreadPool()).build()
> .asyncMap(5, 0, new StreamDebouncer(
> new
> PromiseFactory(PromiseFactory.inlineExecutor()),
> 10, Duration.ofSeconds(1)))
> .filter(c -> !c.isEmpty())
> .collect(Collectors.toList());
>
> Am 06/02/2019 um 15:14 schrieb Jürgen Albert:
>
> Hi Alain,
>
> the issue has a couple of reasons:
>
> The Pushstream and eventsource have by default 2 queues with a size of 32
> each. The default pushback policy is linear. This means, that when 10
> events are in the buffer a pushback of 10 ms will be given to the
> eventsource. This means, that the eventsource will wait this time, before
> it sends the next event downstream. This default behaviour can cause long
> processing times, especially when a lot of events are written in a for
> loop. This means that the queues fill up very quick even if the actual
> processing time of the code of yours is close to 0. Use e.g. the
> ON_FULL_FIXED policy to get ride of this problem.
>
> As far as I understand the bouncer, it waits a second, before it returns a
> list, if no new events are coming. Thus a sleep time of less then a second
> or even 1,5 (together with the pushback thing I described before) will keep
> the stream busy longer then your sleep time for a batch. Thus all the
> batches return when hitting the max size, except the last one. This waits
> and for some threading reasons, the last deferred is blocked from
> resolving, which in turn blocks eventsource close. If you add a small wait
> before the close is called everything is fine.
>
> The blocking issue is interesting non the less, but my experience is that
> these kind of tests are often harsher then reality.
>
> Regards,
>
> Jürgen.
>
> Am 05/02/2019 um 23:58 schrieb Alain Picard:
>
> Tim,
>
> Finally got around to this debouncer, and I tested to change the sleep
> time. When I set it to like 800 to 1500, it never completes after shoing
> "Closing the Generator". At 500, I get a Queue full that I can understand.
> So why the hang?
>
> Alain
>
>
>
> On Mon, Jan 7, 2019 at 8:11 AM Tim Ward  wrote:
>
>> This use case is effectively a “debouncing” behaviour, which is possible
>> to implement with a little thought.
>>
>> There are a couple of ways to attempt it. This one uses the asyncMap
>> operation to asynchronously gather the events until it either times out the
>> promise or it hits the maximum stream size. Note that you have to filter
>> out the “empty” lists that are used to resolve the promises which are being
>> aggregated into the window. The result of this is a window which starts on
>> the first event arrival and then buffers the events for a while. The next
>> window isn’t started until the next event
>>
>>
>> Best Regards,
>>
>> Tim
>>
>>
>> @Test
>> public void testWindow2() throws InvocationTargetException,
>> InterruptedException {
>>
>> PushStreamProvider psp = new PushStreamProvider();
>>
>> SimplePushEventSource sps = psp.createSimpleEventSource(Integer.
>> class);
>>
>> Promise>> result = psp.createStream(sps)
>> .asyncMap(2, 0, new StreamDebouncer(
>> new PromiseFactory(PromiseFactory.inlineExecutor()),
>> 10, Duration.ofSeconds(1)))
>> .filter(c -> !c.isEmpty())
>> .collect(Collectors.toList());
>>
>> new Thread(() -> {
>>
>> for (int i = 0; i < 200;) {
>>
>> for (int j = 0; j < 23; j++) {
>> sps.publish(i++);
>> }
>>
>> try {
>> System.out.println("Burst finished, now at " + i);
>> Thread.sleep(2000);
>> } catch (InterruptedException e) {
>> sps.error(e);
>> break;
>> }
>> }
>>
>> System.out.println("Closing generator");
>> sps.close();
>>
>> }).start();
>>
>> System.out.println(result.getValue().toString());
>>
>> }
>>
>>
>> public static class StreamDebouncer implements Function> extends Collection>> {
>>
>> private final PromiseFactory promiseFactory;
>> private final int maxSize;
>> private final Duration maxTime;
>>
>>
>> private final Object lock = new Object();
>>
>>
>> private List currentWindow;
>> private Deferred> currentDeferred;
>>
>> public StreamDebouncer(PromiseFactory promiseFactory, int maxSize,
>> Duration maxTime) {
>> this.promiseFactory = promiseFactory;
>> this.maxSize = maxSize;
>> this.maxTime = maxTime;
>> }
>>
>> @Override
>> public Promise> apply(T t) throws Exception {
>>
>>
>> Deferred> deferred = null;
>> 

Re: [osgi-dev] Push Streams Event grouping and batching

2019-02-06 Thread Jürgen Albert via osgi-dev

Creating the PushStream with an exclusive Executor fixes the Problem.

Promise>> result = psp.buildStream(sps)
                .withBuffer(new ArrayBlockingQueueInteger>>(32))

.withExecutor(Executors.newCachedThreadPool()).build()
                .asyncMap(5, 0, new StreamDebouncer(
                        new 
PromiseFactory(PromiseFactory.inlineExecutor()),

                        10, Duration.ofSeconds(1)))
                .filter(c -> !c.isEmpty())
                .collect(Collectors.toList());

Am 06/02/2019 um 15:14 schrieb Jürgen Albert:

Hi Alain,

the issue has a couple of reasons:

The Pushstream and eventsource have by default 2 queues with a size of 
32 each. The default pushback policy is linear. This means, that when 
10 events are in the buffer a pushback of 10 ms will be given to the 
eventsource. This means, that the eventsource will wait this time, 
before it sends the next event downstream. This default behaviour can 
cause long processing times, especially when a lot of events are 
written in a for loop. This means that the queues fill up very quick 
even if the actual processing time of the code of yours is close to 0. 
Use e.g. the ON_FULL_FIXED policy to get ride of this problem.


As far as I understand the bouncer, it waits a second, before it 
returns a list, if no new events are coming. Thus a sleep time of less 
then a second or even 1,5 (together with the pushback thing I 
described before) will keep the stream busy longer then your sleep 
time for a batch. Thus all the batches return when hitting the max 
size, except the last one. This waits and for some threading reasons, 
the last deferred is blocked from resolving, which in turn blocks 
eventsource close. If you add a small wait before the close is called 
everything is fine.


The blocking issue is interesting non the less, but my experience is 
that these kind of tests are often harsher then reality.


Regards,

Jürgen.

Am 05/02/2019 um 23:58 schrieb Alain Picard:

Tim,

Finally got around to this debouncer, and I tested to change the 
sleep time. When I set it to like 800 to 1500, it never completes 
after shoing "Closing the Generator". At 500, I get a Queue full that 
I can understand. So why the hang?


Alain



On Mon, Jan 7, 2019 at 8:11 AM Tim Ward > wrote:


This use case is effectively a “debouncing” behaviour, which is
possible to implement with a little thought.

There are a couple of ways to attempt it. This one uses the
asyncMap operation to asynchronously gather the events until it
either times out the promise or it hits the maximum stream size.
Note that you have to filter out the “empty” lists that are used
to resolve the promises which are being aggregated into the
window. The result of this is a window which starts on the first
event arrival and then buffers the events for a while. The next
window isn’t started until the next event


Best Regards,

Tim


@Test
public void testWindow2() throws InvocationTargetException,
InterruptedException {

PushStreamProvider psp = new PushStreamProvider();

SimplePushEventSource sps =
psp.createSimpleEventSource(Integer.class);

Promise>> result = psp.createStream(sps)
.asyncMap(2, 0, new StreamDebouncer(
new PromiseFactory(PromiseFactory.inlineExecutor()),
10, Duration.ofSeconds(1)))
.filter(c -> !c.isEmpty())
.collect(Collectors.toList());

new Thread(() -> {

for (int i = 0; i < 200;) {

for (int j = 0; j < 23; j++) {
sps.publish(i++);
}

try {
System.out.println("Burst finished, now at " + i);
Thread.sleep(2000);
} catch (InterruptedException e) {
sps.error(e);
break;
}
}

System.out.println("Closing generator");
sps.close();

}).start();

System.out.println(result.getValue().toString());

}


public static class StreamDebouncer implements Function>> {

private final PromiseFactory promiseFactory;
privatefinalintmaxSize;
private final Duration maxTime;


private final Object lock = new Object();


privateList currentWindow;
private Deferred> currentDeferred;

public StreamDebouncer(PromiseFactory promiseFactory, int
maxSize, Duration maxTime) {
this.promiseFactory= promiseFactory;
this.maxSize = maxSize;
this.maxTime = maxTime;
}

@Override
public Promise> apply(T t) throws Exception {


Deferred> deferred = null;
Collection list = null;
booleanhitMaxSize= false;
synchronized(lock) {
if(currentWindow== null) {
currentWindow = new ArrayList<>(maxSize);
currentDeferred= promiseFactory.deferred();
deferred= currentDeferred;
list= currentWindow;
}
currentWindow.add(t);
if(currentWindow.size() == maxSize) {
hitMaxSize= true;
deferred= currentDeferred;
currentDeferred= null;
list= currentWindow;
currentWindow= null;
}
}


if(deferred 

Re: [osgi-dev] Push Streams Event grouping and batching

2019-02-06 Thread Jürgen Albert via osgi-dev

Hi Alain,

the issue has a couple of reasons:

The Pushstream and eventsource have by default 2 queues with a size of 
32 each. The default pushback policy is linear. This means, that when 10 
events are in the buffer a pushback of 10 ms will be given to the 
eventsource. This means, that the eventsource will wait this time, 
before it sends the next event downstream. This default behaviour can 
cause long processing times, especially when a lot of events are written 
in a for loop. This means that the queues fill up very quick even if the 
actual processing time of the code of yours is close to 0. Use e.g. the 
ON_FULL_FIXED policy to get ride of this problem.


As far as I understand the bouncer, it waits a second, before it returns 
a list, if no new events are coming. Thus a sleep time of less then a 
second or even 1,5 (together with the pushback thing I described before) 
will keep the stream busy longer then your sleep time for a batch. Thus 
all the batches return when hitting the max size, except the last one. 
This waits and for some threading reasons, the last deferred is blocked 
from resolving, which in turn blocks eventsource close. If you add a 
small wait before the close is called everything is fine.


The blocking issue is interesting non the less, but my experience is 
that these kind of tests are often harsher then reality.


Regards,

Jürgen.

Am 05/02/2019 um 23:58 schrieb Alain Picard:

Tim,

Finally got around to this debouncer, and I tested to change the sleep 
time. When I set it to like 800 to 1500, it never completes after 
shoing "Closing the Generator". At 500, I get a Queue full that I can 
understand. So why the hang?


Alain



On Mon, Jan 7, 2019 at 8:11 AM Tim Ward > wrote:


This use case is effectively a “debouncing” behaviour, which is
possible to implement with a little thought.

There are a couple of ways to attempt it. This one uses the
asyncMap operation to asynchronously gather the events until it
either times out the promise or it hits the maximum stream size.
Note that you have to filter out the “empty” lists that are used
to resolve the promises which are being aggregated into the
window. The result of this is a window which starts on the first
event arrival and then buffers the events for a while. The next
window isn’t started until the next event


Best Regards,

Tim


@Test
public void testWindow2() throws InvocationTargetException,
InterruptedException {

PushStreamProvider psp = new PushStreamProvider();

SimplePushEventSource sps =
psp.createSimpleEventSource(Integer.class);

Promise>> result = psp.createStream(sps)
.asyncMap(2, 0, new StreamDebouncer(
new PromiseFactory(PromiseFactory.inlineExecutor()),
10, Duration.ofSeconds(1)))
.filter(c -> !c.isEmpty())
.collect(Collectors.toList());

new Thread(() -> {

for (int i = 0; i < 200;) {

for (int j = 0; j < 23; j++) {
sps.publish(i++);
}

try {
System.out.println("Burst finished, now at " + i);
Thread.sleep(2000);
} catch (InterruptedException e) {
sps.error(e);
break;
}
}

System.out.println("Closing generator");
sps.close();

}).start();

System.out.println(result.getValue().toString());

}


public static class StreamDebouncer implements Function>> {

private final PromiseFactory promiseFactory;
privatefinalintmaxSize;
private final Duration maxTime;


private final Object lock = new Object();


privateList currentWindow;
private Deferred> currentDeferred;

public StreamDebouncer(PromiseFactory promiseFactory, int maxSize,
Duration maxTime) {
this.promiseFactory= promiseFactory;
this.maxSize = maxSize;
this.maxTime = maxTime;
}

@Override
public Promise> apply(T t) throws Exception {


Deferred> deferred = null;
Collection list = null;
booleanhitMaxSize= false;
synchronized(lock) {
if(currentWindow== null) {
currentWindow = new ArrayList<>(maxSize);
currentDeferred= promiseFactory.deferred();
deferred= currentDeferred;
list= currentWindow;
}
currentWindow.add(t);
if(currentWindow.size() == maxSize) {
hitMaxSize= true;
deferred= currentDeferred;
currentDeferred= null;
list= currentWindow;
currentWindow= null;
}
}


if(deferred != null) {
if(hitMaxSize) {
// We must resolve this way round to avoid racing
// the timeout and ending up with empty lists in
// all the promises
deferred.resolve(Collections.emptyList());
return promiseFactory.resolved(list);
} else {
final Collection finalList = list;
return deferred.getPromise()
.timeout(maxTime.toMillis())
.recover(x -> {
synchronized (lock) {
if(currentWindow == finalList) {
currentWindow = null;
currentDeferred= null;
return finalList;
}
}
return 

Re: [osgi-dev] Push Streams Event grouping and batching

2019-02-05 Thread Alain Picard via osgi-dev
Tim,

Finally got around to this debouncer, and I tested to change the sleep
time. When I set it to like 800 to 1500, it never completes after shoing
"Closing the Generator". At 500, I get a Queue full that I can understand.
So why the hang?

Alain



On Mon, Jan 7, 2019 at 8:11 AM Tim Ward  wrote:

> This use case is effectively a “debouncing” behaviour, which is possible
> to implement with a little thought.
>
> There are a couple of ways to attempt it. This one uses the asyncMap
> operation to asynchronously gather the events until it either times out the
> promise or it hits the maximum stream size. Note that you have to filter
> out the “empty” lists that are used to resolve the promises which are being
> aggregated into the window. The result of this is a window which starts on
> the first event arrival and then buffers the events for a while. The next
> window isn’t started until the next event
>
>
> Best Regards,
>
> Tim
>
>
> @Test
> public void testWindow2() throws InvocationTargetException,
> InterruptedException {
>
> PushStreamProvider psp = new PushStreamProvider();
>
> SimplePushEventSource sps = psp.createSimpleEventSource(Integer.
> class);
>
> Promise>> result = psp.createStream(sps)
> .asyncMap(2, 0, new StreamDebouncer(
> new PromiseFactory(PromiseFactory.inlineExecutor()),
> 10, Duration.ofSeconds(1)))
> .filter(c -> !c.isEmpty())
> .collect(Collectors.toList());
>
> new Thread(() -> {
>
> for (int i = 0; i < 200;) {
>
> for (int j = 0; j < 23; j++) {
> sps.publish(i++);
> }
>
> try {
> System.out.println("Burst finished, now at " + i);
> Thread.sleep(2000);
> } catch (InterruptedException e) {
> sps.error(e);
> break;
> }
> }
>
> System.out.println("Closing generator");
> sps.close();
>
> }).start();
>
> System.out.println(result.getValue().toString());
>
> }
>
>
> public static class StreamDebouncer implements Function extends Collection>> {
>
> private final PromiseFactory promiseFactory;
> private final int maxSize;
> private final Duration maxTime;
>
>
> private final Object lock = new Object();
>
>
> private List currentWindow;
> private Deferred> currentDeferred;
>
> public StreamDebouncer(PromiseFactory promiseFactory, int maxSize,
> Duration maxTime) {
> this.promiseFactory = promiseFactory;
> this.maxSize = maxSize;
> this.maxTime = maxTime;
> }
>
> @Override
> public Promise> apply(T t) throws Exception {
>
>
> Deferred> deferred = null;
> Collection list = null;
> boolean hitMaxSize = false;
> synchronized (lock) {
> if(currentWindow == null) {
> currentWindow = new ArrayList<>(maxSize);
> currentDeferred = promiseFactory.deferred();
> deferred = currentDeferred;
> list = currentWindow;
> }
> currentWindow.add(t);
> if(currentWindow.size() == maxSize) {
> hitMaxSize = true;
> deferred = currentDeferred;
> currentDeferred = null;
> list = currentWindow;
> currentWindow = null;
> }
> }
>
>
> if(deferred != null) {
> if(hitMaxSize) {
> // We must resolve this way round to avoid racing
> // the timeout and ending up with empty lists in
> // all the promises
> deferred.resolve(Collections.emptyList());
> return promiseFactory.resolved(list);
> } else {
> final Collection finalList = list;
> return deferred.getPromise()
> .timeout(maxTime.toMillis())
> .recover(x -> {
> synchronized (lock) {
> if(currentWindow == finalList) {
> currentWindow = null;
> currentDeferred = null;
> return finalList;
> }
> }
> return Collections.emptyList();
> });
> }
> } else {
> return promiseFactory.resolved(Collections.emptyList());
> }
> }
>
>
> }
>
>
>
> On 7 Jan 2019, at 11:25, Alain Picard via osgi-dev 
> wrote:
>
> Thanks  Jürgen,
>
> As I said that is what I went with and it is very satisfying right now. As
> for pushback, I'm out of luck since the producer is fully "disjoint" with a
> whiteboard pattern, but we have configured ourselves with appropriated
> buffering But I'll keep this code in mind, as we will surely have other
> patterns to support as we use push streams more and more.
>
> Alain
>
>
> On Mon, Jan 7, 2019 at 5:55 AM Jürgen Albert via osgi-dev <
> osgi-dev@mail.osgi.org> wrote:
>
>> Hi Alain,
>>
>> windowing would be your goto method. AFAIK there is not way to extend a
>> window, if you expect more messages to arrive. Thus you would need to live
>> with multiple batches, in case of a prolonged burst. back pressure however
>> is possible, even if you use a buffer and/or windowing.  The solution would
>> look like this:
>>
>> psp.createPushStreamBuilder()
>> .withPushbackPolicy( q -> {
>> return Math.max(0, q.size() - 650);
>> })
>> .withQueuePolicy(QueuePolicyOption.BLOCK)
>> .withBuffer(new ArrayBlockingQueue> extends EObject>>(1200))
>> .build();
>>
>> This PuschbackPolicy looks at the queue size and gradually increases the
>> pushback starting with one on the 651st element.
>>
>> The grouping is another topic. The split method 

Re: [osgi-dev] Push Streams Event grouping and batching

2019-01-07 Thread Tim Ward via osgi-dev
I wrote the whole of this myself this morning - I tend to do examples as unit 
tests because they’re easy for people to try out and run. 

I’m not aware of a substantial PushStream teat bucket other than the OSGi 
Compliance Test Suite, which is available to members. 

If you want to start an open source PushStream implementation project then I’m 
sure it could live alongside the Promises implementation in Apache Aries!

Tim. 

Sent from my iPhone

> On 7 Jan 2019, at 14:01, Alain Picard  wrote:
> 
> Tim,
> 
> Thanks, will review an apply.
> 
> BTW, this seems to come from some of the tests, and I've been looking where 
> tests are located, as this is often very revealing in how various aspects 
> actually work and I have not been able to find them. Are they on Github 
> somewhere?
> 
> Alain
> 
> 
>> On Mon, Jan 7, 2019 at 8:11 AM Tim Ward  wrote:
>> This use case is effectively a “debouncing” behaviour, which is possible to 
>> implement with a little thought.
>> 
>> There are a couple of ways to attempt it. This one uses the asyncMap 
>> operation to asynchronously gather the events until it either times out the 
>> promise or it hits the maximum stream size. Note that you have to filter out 
>> the “empty” lists that are used to resolve the promises which are being 
>> aggregated into the window. The result of this is a window which starts on 
>> the first event arrival and then buffers the events for a while. The next 
>> window isn’t started until the next event
>> 
>> 
>> Best Regards,
>> 
>> Tim
>> 
>> 
>>  @Test
>>  public void testWindow2() throws InvocationTargetException, 
>> InterruptedException {
>> 
>>  PushStreamProvider psp = new PushStreamProvider();
>> 
>>  SimplePushEventSource sps = 
>> psp.createSimpleEventSource(Integer.class);
>> 
>>  Promise>> result = 
>> psp.createStream(sps)
>>  .asyncMap(2, 0, new StreamDebouncer(
>>  new 
>> PromiseFactory(PromiseFactory.inlineExecutor()), 
>>  10, Duration.ofSeconds(1)))
>>  .filter(c -> !c.isEmpty())
>>  .collect(Collectors.toList());
>> 
>>  new Thread(() -> {
>> 
>>  for (int i = 0; i < 200;) {
>> 
>>  for (int j = 0; j < 23; j++) {
>>  sps.publish(i++);
>>  }
>> 
>>  try {
>>  System.out.println("Burst finished, now 
>> at " + i);
>>  Thread.sleep(2000);
>>  } catch (InterruptedException e) {
>>  sps.error(e);
>>  break;
>>  }
>>  }
>> 
>>  System.out.println("Closing generator");
>>  sps.close();
>> 
>>  }).start();
>> 
>>  System.out.println(result.getValue().toString());
>> 
>>  }
>>  
>>  public static class StreamDebouncer implements Function> extends Collection>> {
>> 
>>  private final PromiseFactory promiseFactory;
>>  private final int maxSize;
>>  private final Duration maxTime;
>>  
>>  private final Object lock = new Object();
>>  
>>  private List currentWindow;
>>  private Deferred> currentDeferred;
>> 
>>  public StreamDebouncer(PromiseFactory promiseFactory, int 
>> maxSize, Duration maxTime) {
>>  this.promiseFactory = promiseFactory;
>>  this.maxSize = maxSize;
>>  this.maxTime = maxTime;
>>  }
>> 
>>  @Override
>>  public Promise> apply(T t) throws Exception {
>>  
>>  Deferred> deferred = null;
>>  Collection list = null;
>>  boolean hitMaxSize = false;
>>  synchronized (lock) {
>>  if(currentWindow == null) {
>>  currentWindow = new 
>> ArrayList<>(maxSize);
>>  currentDeferred = 
>> promiseFactory.deferred();
>>  deferred = currentDeferred;
>>  list = currentWindow;
>>  }
>>  currentWindow.add(t);
>>  if(currentWindow.size() == maxSize) {
>>  hitMaxSize = true;
>>  deferred = currentDeferred;
>>  currentDeferred = null;
>>  list = currentWindow;
>>

Re: [osgi-dev] Push Streams Event grouping and batching

2019-01-07 Thread Alain Picard via osgi-dev
Tim,

Thanks, will review an apply.

BTW, this seems to come from some of the tests, and I've been looking where
tests are located, as this is often very revealing in how various aspects
actually work and I have not been able to find them. Are they on Github
somewhere?

Alain


On Mon, Jan 7, 2019 at 8:11 AM Tim Ward  wrote:

> This use case is effectively a “debouncing” behaviour, which is possible
> to implement with a little thought.
>
> There are a couple of ways to attempt it. This one uses the asyncMap
> operation to asynchronously gather the events until it either times out the
> promise or it hits the maximum stream size. Note that you have to filter
> out the “empty” lists that are used to resolve the promises which are being
> aggregated into the window. The result of this is a window which starts on
> the first event arrival and then buffers the events for a while. The next
> window isn’t started until the next event
>
>
> Best Regards,
>
> Tim
>
>
> @Test
> public void testWindow2() throws InvocationTargetException,
> InterruptedException {
>
> PushStreamProvider psp = new PushStreamProvider();
>
> SimplePushEventSource sps = psp.createSimpleEventSource(Integer.
> class);
>
> Promise>> result = psp.createStream(sps)
> .asyncMap(2, 0, new StreamDebouncer(
> new PromiseFactory(PromiseFactory.inlineExecutor()),
> 10, Duration.ofSeconds(1)))
> .filter(c -> !c.isEmpty())
> .collect(Collectors.toList());
>
> new Thread(() -> {
>
> for (int i = 0; i < 200;) {
>
> for (int j = 0; j < 23; j++) {
> sps.publish(i++);
> }
>
> try {
> System.out.println("Burst finished, now at " + i);
> Thread.sleep(2000);
> } catch (InterruptedException e) {
> sps.error(e);
> break;
> }
> }
>
> System.out.println("Closing generator");
> sps.close();
>
> }).start();
>
> System.out.println(result.getValue().toString());
>
> }
>
>
> public static class StreamDebouncer implements Function extends Collection>> {
>
> private final PromiseFactory promiseFactory;
> private final int maxSize;
> private final Duration maxTime;
>
>
> private final Object lock = new Object();
>
>
> private List currentWindow;
> private Deferred> currentDeferred;
>
> public StreamDebouncer(PromiseFactory promiseFactory, int maxSize,
> Duration maxTime) {
> this.promiseFactory = promiseFactory;
> this.maxSize = maxSize;
> this.maxTime = maxTime;
> }
>
> @Override
> public Promise> apply(T t) throws Exception {
>
>
> Deferred> deferred = null;
> Collection list = null;
> boolean hitMaxSize = false;
> synchronized (lock) {
> if(currentWindow == null) {
> currentWindow = new ArrayList<>(maxSize);
> currentDeferred = promiseFactory.deferred();
> deferred = currentDeferred;
> list = currentWindow;
> }
> currentWindow.add(t);
> if(currentWindow.size() == maxSize) {
> hitMaxSize = true;
> deferred = currentDeferred;
> currentDeferred = null;
> list = currentWindow;
> currentWindow = null;
> }
> }
>
>
> if(deferred != null) {
> if(hitMaxSize) {
> // We must resolve this way round to avoid racing
> // the timeout and ending up with empty lists in
> // all the promises
> deferred.resolve(Collections.emptyList());
> return promiseFactory.resolved(list);
> } else {
> final Collection finalList = list;
> return deferred.getPromise()
> .timeout(maxTime.toMillis())
> .recover(x -> {
> synchronized (lock) {
> if(currentWindow == finalList) {
> currentWindow = null;
> currentDeferred = null;
> return finalList;
> }
> }
> return Collections.emptyList();
> });
> }
> } else {
> return promiseFactory.resolved(Collections.emptyList());
> }
> }
>
>
> }
>
>
>
> On 7 Jan 2019, at 11:25, Alain Picard via osgi-dev 
> wrote:
>
> Thanks  Jürgen,
>
> As I said that is what I went with and it is very satisfying right now. As
> for pushback, I'm out of luck since the producer is fully "disjoint" with a
> whiteboard pattern, but we have configured ourselves with appropriated
> buffering But I'll keep this code in mind, as we will surely have other
> patterns to support as we use push streams more and more.
>
> Alain
>
>
> On Mon, Jan 7, 2019 at 5:55 AM Jürgen Albert via osgi-dev <
> osgi-dev@mail.osgi.org> wrote:
>
>> Hi Alain,
>>
>> windowing would be your goto method. AFAIK there is not way to extend a
>> window, if you expect more messages to arrive. Thus you would need to live
>> with multiple batches, in case of a prolonged burst. back pressure however
>> is possible, even if you use a buffer and/or windowing.  The solution would
>> look like this:
>>
>> psp.createPushStreamBuilder()
>> .withPushbackPolicy( q -> {
>> return Math.max(0, q.size() - 650);
>> })
>> .withQueuePolicy(QueuePolicyOption.BLOCK)
>> .withBuffer(new ArrayBlockingQueue> extends EObject>>(1200))
>> .build();
>>
>> This PuschbackPolicy looks at the queue size and gradually increases the
>> pushback starting with one on the 651st element.
>>
>> The grouping is 

Re: [osgi-dev] Push Streams Event grouping and batching

2019-01-07 Thread Tim Ward via osgi-dev
This use case is effectively a “debouncing” behaviour, which is possible to 
implement with a little thought.

There are a couple of ways to attempt it. This one uses the asyncMap operation 
to asynchronously gather the events until it either times out the promise or it 
hits the maximum stream size. Note that you have to filter out the “empty” 
lists that are used to resolve the promises which are being aggregated into the 
window. The result of this is a window which starts on the first event arrival 
and then buffers the events for a while. The next window isn’t started until 
the next event


Best Regards,

Tim


@Test
public void testWindow2() throws InvocationTargetException, 
InterruptedException {

PushStreamProvider psp = new PushStreamProvider();

SimplePushEventSource sps = 
psp.createSimpleEventSource(Integer.class);

Promise>> result = 
psp.createStream(sps)
.asyncMap(2, 0, new StreamDebouncer(
new 
PromiseFactory(PromiseFactory.inlineExecutor()), 
10, Duration.ofSeconds(1)))
.filter(c -> !c.isEmpty())
.collect(Collectors.toList());

new Thread(() -> {

for (int i = 0; i < 200;) {

for (int j = 0; j < 23; j++) {
sps.publish(i++);
}

try {
System.out.println("Burst finished, now 
at " + i);
Thread.sleep(2000);
} catch (InterruptedException e) {
sps.error(e);
break;
}
}

System.out.println("Closing generator");
sps.close();

}).start();

System.out.println(result.getValue().toString());

}

public static class StreamDebouncer implements Function>> {

private final PromiseFactory promiseFactory;
private final int maxSize;
private final Duration maxTime;

private final Object lock = new Object();

private List currentWindow;
private Deferred> currentDeferred;

public StreamDebouncer(PromiseFactory promiseFactory, int 
maxSize, Duration maxTime) {
this.promiseFactory = promiseFactory;
this.maxSize = maxSize;
this.maxTime = maxTime;
}

@Override
public Promise> apply(T t) throws Exception {

Deferred> deferred = null;
Collection list = null;
boolean hitMaxSize = false;
synchronized (lock) {
if(currentWindow == null) {
currentWindow = new 
ArrayList<>(maxSize);
currentDeferred = 
promiseFactory.deferred();
deferred = currentDeferred;
list = currentWindow;
}
currentWindow.add(t);
if(currentWindow.size() == maxSize) {
hitMaxSize = true;
deferred = currentDeferred;
currentDeferred = null;
list = currentWindow;
currentWindow = null;
}
}

if(deferred != null) {
if(hitMaxSize) {
// We must resolve this way round to 
avoid racing
// the timeout and ending up with empty 
lists in
// all the promises

deferred.resolve(Collections.emptyList());
return promiseFactory.resolved(list);
} else {
final Collection finalList = list;
return deferred.getPromise()

.timeout(maxTime.toMillis())
.recover(x -> {
   

Re: [osgi-dev] Push Streams Event grouping and batching

2019-01-07 Thread Alain Picard via osgi-dev
Thanks  Jürgen,

As I said that is what I went with and it is very satisfying right now. As
for pushback, I'm out of luck since the producer is fully "disjoint" with a
whiteboard pattern, but we have configured ourselves with appropriated
buffering But I'll keep this code in mind, as we will surely have other
patterns to support as we use push streams more and more.

Alain


On Mon, Jan 7, 2019 at 5:55 AM Jürgen Albert via osgi-dev <
osgi-dev@mail.osgi.org> wrote:

> Hi Alain,
>
> windowing would be your goto method. AFAIK there is not way to extend a
> window, if you expect more messages to arrive. Thus you would need to live
> with multiple batches, in case of a prolonged burst. back pressure however
> is possible, even if you use a buffer and/or windowing.  The solution would
> look like this:
>
> psp.createPushStreamBuilder()
> .withPushbackPolicy( q -> {
> return Math.max(0, q.size() - 650);
> })
> .withQueuePolicy(QueuePolicyOption.BLOCK)
> .withBuffer(new ArrayBlockingQueue extends EObject>>(1200))
> .build();
>
> This PuschbackPolicy looks at the queue size and gradually increases the
> pushback starting with one on the 651st element.
>
> The grouping is another topic. The split method can do your grouping, if
> you know what groups to expect. It essentially  returns an array of
> pushstreams correlating to each predicate you give it. For everything else,
> you would need to do the grouping for every batch you get with the usual
> stream methods.
>
> Regards,
>
> Jürgen.
>
> Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev:
>
> For now I went with my simple solution of using a window with just
> duration, and that is working fine, even if it might not be the most
> optimal or streamlined approach.
>
> Alain
>
>
> On Sat, Jan 5, 2019 at 5:27 AM Alain Picard  wrote:
>
>> Hi,
>>
>> We are using push streams to process post-commit events. Those events
>> originate from different data sources. At the moment we are processing
>> those individually, but the overhead of having a transaction for each is
>> too much. Quite often those events come in bursts following an upstream
>> transaction/ change set.
>>
>> The goal is to group events by data source and batch them, i.e. wait a
>> bit when an event arrives to see if others are also coming. If they keep
>> coming, keep collecting a bit longer, o/w move on.
>>
>> I see that the PushStream has methods coalesce and window. Window seems a
>> bit more appropriate here, as it offers both duration and maxEvents. But it
>> seems to operate all the time, and not start a batch upon receiving an
>> event, which doesn't sound optimal in this case. More concerning to me is
>> the comment regarding back-pressure. We can't use back pressure (no control
>> on producer which is implemented via whiteboard. So here the maxEvents is
>> more a way to limit the batch and not to indicate need for back pressure.
>>
>> Still, that doesn't address grouping. See that there is a fork, but that
>> is made to deal with a fixed number of child streams.
>>
>> Would I just be best to use a window with just duration, collect a number
>> of events, then move on and use a regular stream to group them and if
>> necessary batch them in smaller groups?
>>
>> Cheers,
>>
>> Alain
>>
>>
> ___
> OSGi Developer Mail 
> listosgi-...@mail.osgi.orghttps://mail.osgi.org/mailman/listinfo/osgi-dev
>
>
> --
> Jürgen Albert
> Geschäftsführer
>
> Data In Motion Consulting GmbH
>
> Kahlaische Str. 4
> 07745 Jena
>
> Mobil:  0157-72521634
> E-Mail: j.alb...@datainmotion.de
> Web: www.datainmotion.de
>
> XING:   https://www.xing.com/profile/Juergen_Albert5
>
> Rechtliches
>
> Jena HBR 513025
>
> ___
> OSGi Developer Mail List
> osgi-dev@mail.osgi.org
> https://mail.osgi.org/mailman/listinfo/osgi-dev
___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Re: [osgi-dev] Push Streams Event grouping and batching

2019-01-07 Thread Jürgen Albert via osgi-dev

Hi Alain,

windowing would be your goto method. AFAIK there is not way to extend a 
window, if you expect more messages to arrive. Thus you would need to 
live with multiple batches, in case of a prolonged burst. back pressure 
however is possible, even if you use a buffer and/or windowing.  The 
solution would look like this:


psp.createPushStreamBuilder()
                        .withPushbackPolicy( q -> {
                            return Math.max(0, q.size() - 650);
                        })
.withQueuePolicy(QueuePolicyOption.BLOCK)
                    .withBuffer(new ArrayBlockingQueueextends EObject>>(1200))

                        .build();

This PuschbackPolicy looks at the queue size and gradually increases the 
pushback starting with one on the 651st element.


The grouping is another topic. The split method can do your grouping, if 
you know what groups to expect. It essentially  returns an array of 
pushstreams correlating to each predicate you give it. For everything 
else, you would need to do the grouping for every batch you get with the 
usual stream methods.


Regards,

Jürgen.

Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev:
For now I went with my simple solution of using a window with just 
duration, and that is working fine, even if it might not be the most 
optimal or streamlined approach.


Alain


On Sat, Jan 5, 2019 at 5:27 AM Alain Picard > wrote:


Hi,

We are using push streams to process post-commit events. Those
events originate from different data sources. At the moment we are
processing those individually, but the overhead of having a
transaction for each is too much. Quite often those events come in
bursts following an upstream transaction/ change set.

The goal is to group events by data source and batch them, i.e.
wait a bit when an event arrives to see if others are also coming.
If they keep coming, keep collecting a bit longer, o/w move on.

I see that the PushStream has methods coalesce and window. Window
seems a bit more appropriate here, as it offers both duration and
maxEvents. But it seems to operate all the time, and not start a
batch upon receiving an event, which doesn't sound optimal in this
case. More concerning to me is the comment regarding
back-pressure. We can't use back pressure (no control on producer
which is implemented via whiteboard. So here the maxEvents is more
a way to limit the batch and not to indicate need for back pressure.

Still, that doesn't address grouping. See that there is a fork,
but that is made to deal with a fixed number of child streams.

Would I just be best to use a window with just duration, collect a
number of events, then move on and use a regular stream to group
them and if necessary batch them in smaller groups?

Cheers,

Alain


___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev



--
Jürgen Albert
Geschäftsführer

Data In Motion Consulting GmbH

Kahlaische Str. 4
07745 Jena

Mobil:  0157-72521634
E-Mail: j.alb...@datainmotion.de
Web: www.datainmotion.de

XING:   https://www.xing.com/profile/Juergen_Albert5

Rechtliches

Jena HBR 513025

___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

[osgi-dev] Push Streams Event grouping and batching

2019-01-05 Thread Alain Picard via osgi-dev
Hi,

We are using push streams to process post-commit events. Those events
originate from different data sources. At the moment we are processing
those individually, but the overhead of having a transaction for each is
too much. Quite often those events come in bursts following an upstream
transaction/ change set.

The goal is to group events by data source and batch them, i.e. wait a bit
when an event arrives to see if others are also coming. If they keep
coming, keep collecting a bit longer, o/w move on.

I see that the PushStream has methods coalesce and window. Window seems a
bit more appropriate here, as it offers both duration and maxEvents. But it
seems to operate all the time, and not start a batch upon receiving an
event, which doesn't sound optimal in this case. More concerning to me is
the comment regarding back-pressure. We can't use back pressure (no control
on producer which is implemented via whiteboard. So here the maxEvents is
more a way to limit the batch and not to indicate need for back pressure.

Still, that doesn't address grouping. See that there is a fork, but that is
made to deal with a fixed number of child streams.

Would I just be best to use a window with just duration, collect a number
of events, then move on and use a regular stream to group them and if
necessary batch them in smaller groups?

Cheers,

Alain
___
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev