Jurgen,

Thanks for the clarifications. I also did about the same change to match
our standard stream implementation, and didn't get the issue, but I was too
much in the dark to make any meaningful conclusions.

I was looking to modify the test to verify various scenarios. So I guess
I'll make those changes and proceed from there.

Thanks
Alain


On Wed, Feb 6, 2019 at 9:21 AM Jürgen Albert <j.alb...@data-in-motion.biz>
wrote:

> Creating the PushStream with an exclusive Executor fixes the Problem.
>
>         Promise<List<Collection<Integer>>> result = psp.buildStream(sps)
>                 .withBuffer(new ArrayBlockingQueue<PushEvent<? extends
> Integer>>(32))
>                 .withExecutor(Executors.newCachedThreadPool()).build()
>                 .asyncMap(5, 0, new StreamDebouncer<Integer>(
>                         new
> PromiseFactory(PromiseFactory.inlineExecutor()),
>                         10, Duration.ofSeconds(1)))
>                 .filter(c -> !c.isEmpty())
>                 .collect(Collectors.toList());
>
> Am 06/02/2019 um 15:14 schrieb Jürgen Albert:
>
> Hi Alain,
>
> the issue has a couple of reasons:
>
> The Pushstream and eventsource have by default 2 queues with a size of 32
> each. The default pushback policy is linear. This means, that when 10
> events are in the buffer a pushback of 10 ms will be given to the
> eventsource. This means, that the eventsource will wait this time, before
> it sends the next event downstream. This default behaviour can cause long
> processing times, especially when a lot of events are written in a for
> loop. This means that the queues fill up very quick even if the actual
> processing time of the code of yours is close to 0. Use e.g. the
> ON_FULL_FIXED policy to get ride of this problem.
>
> As far as I understand the bouncer, it waits a second, before it returns a
> list, if no new events are coming. Thus a sleep time of less then a second
> or even 1,5 (together with the pushback thing I described before) will keep
> the stream busy longer then your sleep time for a batch. Thus all the
> batches return when hitting the max size, except the last one. This waits
> and for some threading reasons, the last deferred is blocked from
> resolving, which in turn blocks eventsource close. If you add a small wait
> before the close is called everything is fine.
>
> The blocking issue is interesting non the less, but my experience is that
> these kind of tests are often harsher then reality.
>
> Regards,
>
> Jürgen.
>
> Am 05/02/2019 um 23:58 schrieb Alain Picard:
>
> Tim,
>
> Finally got around to this debouncer, and I tested to change the sleep
> time. When I set it to like 800 to 1500, it never completes after shoing
> "Closing the Generator". At 500, I get a Queue full that I can understand.
> So why the hang?
>
> Alain
>
>
>
> On Mon, Jan 7, 2019 at 8:11 AM Tim Ward <tim.w...@paremus.com> wrote:
>
>> This use case is effectively a “debouncing” behaviour, which is possible
>> to implement with a little thought.
>>
>> There are a couple of ways to attempt it. This one uses the asyncMap
>> operation to asynchronously gather the events until it either times out the
>> promise or it hits the maximum stream size. Note that you have to filter
>> out the “empty” lists that are used to resolve the promises which are being
>> aggregated into the window. The result of this is a window which starts on
>> the first event arrival and then buffers the events for a while. The next
>> window isn’t started until the next event
>>
>>
>> Best Regards,
>>
>> Tim
>>
>>
>> @Test
>> public void testWindow2() throws InvocationTargetException,
>> InterruptedException {
>>
>> PushStreamProvider psp = new PushStreamProvider();
>>
>> SimplePushEventSource<Integer> sps = psp.createSimpleEventSource(Integer.
>> class);
>>
>> Promise<List<Collection<Integer>>> result = psp.createStream(sps)
>> .asyncMap(2, 0, new StreamDebouncer<Integer>(
>> new PromiseFactory(PromiseFactory.inlineExecutor()),
>> 10, Duration.ofSeconds(1)))
>> .filter(c -> !c.isEmpty())
>> .collect(Collectors.toList());
>>
>> new Thread(() -> {
>>
>> for (int i = 0; i < 200;) {
>>
>> for (int j = 0; j < 23; j++) {
>> sps.publish(i++);
>> }
>>
>> try {
>> System.out.println("Burst finished, now at " + i);
>> Thread.sleep(2000);
>> } catch (InterruptedException e) {
>> sps.error(e);
>> break;
>> }
>> }
>>
>> System.out.println("Closing generator");
>> sps.close();
>>
>> }).start();
>>
>> System.out.println(result.getValue().toString());
>>
>> }
>>
>>
>> public static class StreamDebouncer<T> implements Function<T, Promise<?
>> extends Collection<T>>> {
>>
>> private final PromiseFactory promiseFactory;
>> private final int maxSize;
>> private final Duration maxTime;
>>
>>
>> private final Object lock = new Object();
>>
>>
>> private List<T> currentWindow;
>> private Deferred<Collection<T>> currentDeferred;
>>
>> public StreamDebouncer(PromiseFactory promiseFactory, int maxSize,
>> Duration maxTime) {
>> this.promiseFactory = promiseFactory;
>> this.maxSize = maxSize;
>> this.maxTime = maxTime;
>> }
>>
>> @Override
>> public Promise<Collection<T>> apply(T t) throws Exception {
>>
>>
>> Deferred<Collection<T>> deferred = null;
>> Collection<T> list = null;
>> boolean hitMaxSize = false;
>> synchronized (lock) {
>> if(currentWindow == null) {
>> currentWindow = new ArrayList<>(maxSize);
>> currentDeferred = promiseFactory.deferred();
>> deferred = currentDeferred;
>> list = currentWindow;
>> }
>> currentWindow.add(t);
>> if(currentWindow.size() == maxSize) {
>> hitMaxSize = true;
>> deferred = currentDeferred;
>> currentDeferred = null;
>> list = currentWindow;
>> currentWindow = null;
>> }
>> }
>>
>>
>> if(deferred != null) {
>> if(hitMaxSize) {
>> // We must resolve this way round to avoid racing
>> // the timeout and ending up with empty lists in
>> // all the promises
>> deferred.resolve(Collections.emptyList());
>> return promiseFactory.resolved(list);
>> } else {
>> final Collection<T> finalList = list;
>> return deferred.getPromise()
>> .timeout(maxTime.toMillis())
>> .recover(x -> {
>> synchronized (lock) {
>> if(currentWindow == finalList) {
>> currentWindow = null;
>> currentDeferred = null;
>> return finalList;
>> }
>> }
>> return Collections.emptyList();
>> });
>> }
>> } else {
>> return promiseFactory.resolved(Collections.emptyList());
>> }
>> }
>>
>>
>> }
>>
>>
>>
>> On 7 Jan 2019, at 11:25, Alain Picard via osgi-dev <
>> osgi-dev@mail.osgi.org> wrote:
>>
>> Thanks  Jürgen,
>>
>> As I said that is what I went with and it is very satisfying right now.
>> As for pushback, I'm out of luck since the producer is fully "disjoint"
>> with a whiteboard pattern, but we have configured ourselves with
>> appropriated buffering But I'll keep this code in mind, as we will surely
>> have other patterns to support as we use push streams more and more.
>>
>> Alain
>>
>>
>> On Mon, Jan 7, 2019 at 5:55 AM Jürgen Albert via osgi-dev <
>> osgi-dev@mail.osgi.org> wrote:
>>
>>> Hi Alain,
>>>
>>> windowing would be your goto method. AFAIK there is not way to extend a
>>> window, if you expect more messages to arrive. Thus you would need to live
>>> with multiple batches, in case of a prolonged burst. back pressure however
>>> is possible, even if you use a buffer and/or windowing.  The solution would
>>> look like this:
>>>
>>> psp.createPushStreamBuilder()
>>>                         .withPushbackPolicy( q -> {
>>>                             return Math.max(0, q.size() - 650);
>>>                         })
>>>                         .withQueuePolicy(QueuePolicyOption.BLOCK)
>>>                         .withBuffer(new ArrayBlockingQueue<PushEvent<?
>>> extends EObject>>(1200))
>>>                         .build();
>>>
>>> This PuschbackPolicy looks at the queue size and gradually increases the
>>> pushback starting with one on the 651st element.
>>>
>>> The grouping is another topic. The split method can do your grouping, if
>>> you know what groups to expect. It essentially  returns an array of
>>> pushstreams correlating to each predicate you give it. For everything else,
>>> you would need to do the grouping for every batch you get with the usual
>>> stream methods.
>>>
>>> Regards,
>>>
>>> Jürgen.
>>>
>>> Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev:
>>>
>>> For now I went with my simple solution of using a window with just
>>> duration, and that is working fine, even if it might not be the most
>>> optimal or streamlined approach.
>>>
>>> Alain
>>>
>>>
>>> On Sat, Jan 5, 2019 at 5:27 AM Alain Picard <pic...@castortech.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are using push streams to process post-commit events. Those events
>>>> originate from different data sources. At the moment we are processing
>>>> those individually, but the overhead of having a transaction for each is
>>>> too much. Quite often those events come in bursts following an upstream
>>>> transaction/ change set.
>>>>
>>>> The goal is to group events by data source and batch them, i.e. wait a
>>>> bit when an event arrives to see if others are also coming. If they keep
>>>> coming, keep collecting a bit longer, o/w move on.
>>>>
>>>> I see that the PushStream has methods coalesce and window. Window seems
>>>> a bit more appropriate here, as it offers both duration and maxEvents. But
>>>> it seems to operate all the time, and not start a batch upon receiving an
>>>> event, which doesn't sound optimal in this case. More concerning to me is
>>>> the comment regarding back-pressure. We can't use back pressure (no control
>>>> on producer which is implemented via whiteboard. So here the maxEvents is
>>>> more a way to limit the batch and not to indicate need for back pressure.
>>>>
>>>> Still, that doesn't address grouping. See that there is a fork, but
>>>> that is made to deal with a fixed number of child streams.
>>>>
>>>> Would I just be best to use a window with just duration, collect a
>>>> number of events, then move on and use a regular stream to group them and
>>>> if necessary batch them in smaller groups?
>>>>
>>>> Cheers,
>>>>
>>>> Alain
>>>>
>>>>
>>> _______________________________________________
>>> OSGi Developer Mail 
>>> listosgi-...@mail.osgi.orghttps://mail.osgi.org/mailman/listinfo/osgi-dev
>>>
>>>
>>> --
>>> Jürgen Albert
>>> Geschäftsführer
>>>
>>> Data In Motion Consulting GmbH
>>>
>>> Kahlaische Str. 4
>>> 07745 Jena
>>>
>>> Mobil:  0157-72521634
>>> E-Mail: j.alb...@datainmotion.de
>>> Web: www.datainmotion.de
>>>
>>> XING:   https://www.xing.com/profile/Juergen_Albert5
>>>
>>> Rechtliches
>>>
>>> Jena HBR 513025
>>>
>>> _______________________________________________
>>> OSGi Developer Mail List
>>> osgi-dev@mail.osgi.org
>>> https://mail.osgi.org/mailman/listinfo/osgi-dev
>>
>> _______________________________________________
>> OSGi Developer Mail List
>> osgi-dev@mail.osgi.org
>> https://mail.osgi.org/mailman/listinfo/osgi-dev
>>
>>
>>
> --
> Jürgen Albert
> Geschäftsführer
>
> Data In Motion Consulting GmbH
>
> Kahlaische Str. 4
> 07745 Jena
>
> Mobil:  0157-72521634
> E-Mail: j.alb...@datainmotion.de
> Web: www.datainmotion.de
>
> XING:   https://www.xing.com/profile/Juergen_Albert5
>
> Rechtliches
>
> Jena HBR 513025
>
>
> --
> Jürgen Albert
> Geschäftsführer
>
> Data In Motion Consulting GmbH
>
> Kahlaische Str. 4
> 07745 Jena
>
> Mobil:  0157-72521634
> E-Mail: j.alb...@datainmotion.de
> Web: www.datainmotion.de
>
> XING:   https://www.xing.com/profile/Juergen_Albert5
>
> Rechtliches
>
> Jena HBR 513025
>
>
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to