Tim,

Finally got around to this debouncer, and I tested to change the sleep
time. When I set it to like 800 to 1500, it never completes after shoing
"Closing the Generator". At 500, I get a Queue full that I can understand.
So why the hang?

Alain



On Mon, Jan 7, 2019 at 8:11 AM Tim Ward <tim.w...@paremus.com> wrote:

> This use case is effectively a “debouncing” behaviour, which is possible
> to implement with a little thought.
>
> There are a couple of ways to attempt it. This one uses the asyncMap
> operation to asynchronously gather the events until it either times out the
> promise or it hits the maximum stream size. Note that you have to filter
> out the “empty” lists that are used to resolve the promises which are being
> aggregated into the window. The result of this is a window which starts on
> the first event arrival and then buffers the events for a while. The next
> window isn’t started until the next event
>
>
> Best Regards,
>
> Tim
>
>
> @Test
> public void testWindow2() throws InvocationTargetException,
> InterruptedException {
>
> PushStreamProvider psp = new PushStreamProvider();
>
> SimplePushEventSource<Integer> sps = psp.createSimpleEventSource(Integer.
> class);
>
> Promise<List<Collection<Integer>>> result = psp.createStream(sps)
> .asyncMap(2, 0, new StreamDebouncer<Integer>(
> new PromiseFactory(PromiseFactory.inlineExecutor()),
> 10, Duration.ofSeconds(1)))
> .filter(c -> !c.isEmpty())
> .collect(Collectors.toList());
>
> new Thread(() -> {
>
> for (int i = 0; i < 200;) {
>
> for (int j = 0; j < 23; j++) {
> sps.publish(i++);
> }
>
> try {
> System.out.println("Burst finished, now at " + i);
> Thread.sleep(2000);
> } catch (InterruptedException e) {
> sps.error(e);
> break;
> }
> }
>
> System.out.println("Closing generator");
> sps.close();
>
> }).start();
>
> System.out.println(result.getValue().toString());
>
> }
>
>
> public static class StreamDebouncer<T> implements Function<T, Promise<?
> extends Collection<T>>> {
>
> private final PromiseFactory promiseFactory;
> private final int maxSize;
> private final Duration maxTime;
>
>
> private final Object lock = new Object();
>
>
> private List<T> currentWindow;
> private Deferred<Collection<T>> currentDeferred;
>
> public StreamDebouncer(PromiseFactory promiseFactory, int maxSize,
> Duration maxTime) {
> this.promiseFactory = promiseFactory;
> this.maxSize = maxSize;
> this.maxTime = maxTime;
> }
>
> @Override
> public Promise<Collection<T>> apply(T t) throws Exception {
>
>
> Deferred<Collection<T>> deferred = null;
> Collection<T> list = null;
> boolean hitMaxSize = false;
> synchronized (lock) {
> if(currentWindow == null) {
> currentWindow = new ArrayList<>(maxSize);
> currentDeferred = promiseFactory.deferred();
> deferred = currentDeferred;
> list = currentWindow;
> }
> currentWindow.add(t);
> if(currentWindow.size() == maxSize) {
> hitMaxSize = true;
> deferred = currentDeferred;
> currentDeferred = null;
> list = currentWindow;
> currentWindow = null;
> }
> }
>
>
> if(deferred != null) {
> if(hitMaxSize) {
> // We must resolve this way round to avoid racing
> // the timeout and ending up with empty lists in
> // all the promises
> deferred.resolve(Collections.emptyList());
> return promiseFactory.resolved(list);
> } else {
> final Collection<T> finalList = list;
> return deferred.getPromise()
> .timeout(maxTime.toMillis())
> .recover(x -> {
> synchronized (lock) {
> if(currentWindow == finalList) {
> currentWindow = null;
> currentDeferred = null;
> return finalList;
> }
> }
> return Collections.emptyList();
> });
> }
> } else {
> return promiseFactory.resolved(Collections.emptyList());
> }
> }
>
>
> }
>
>
>
> On 7 Jan 2019, at 11:25, Alain Picard via osgi-dev <osgi-dev@mail.osgi.org>
> wrote:
>
> Thanks  Jürgen,
>
> As I said that is what I went with and it is very satisfying right now. As
> for pushback, I'm out of luck since the producer is fully "disjoint" with a
> whiteboard pattern, but we have configured ourselves with appropriated
> buffering But I'll keep this code in mind, as we will surely have other
> patterns to support as we use push streams more and more.
>
> Alain
>
>
> On Mon, Jan 7, 2019 at 5:55 AM Jürgen Albert via osgi-dev <
> osgi-dev@mail.osgi.org> wrote:
>
>> Hi Alain,
>>
>> windowing would be your goto method. AFAIK there is not way to extend a
>> window, if you expect more messages to arrive. Thus you would need to live
>> with multiple batches, in case of a prolonged burst. back pressure however
>> is possible, even if you use a buffer and/or windowing.  The solution would
>> look like this:
>>
>> psp.createPushStreamBuilder()
>>                         .withPushbackPolicy( q -> {
>>                             return Math.max(0, q.size() - 650);
>>                         })
>>                         .withQueuePolicy(QueuePolicyOption.BLOCK)
>>                         .withBuffer(new ArrayBlockingQueue<PushEvent<?
>> extends EObject>>(1200))
>>                         .build();
>>
>> This PuschbackPolicy looks at the queue size and gradually increases the
>> pushback starting with one on the 651st element.
>>
>> The grouping is another topic. The split method can do your grouping, if
>> you know what groups to expect. It essentially  returns an array of
>> pushstreams correlating to each predicate you give it. For everything else,
>> you would need to do the grouping for every batch you get with the usual
>> stream methods.
>>
>> Regards,
>>
>> Jürgen.
>>
>> Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev:
>>
>> For now I went with my simple solution of using a window with just
>> duration, and that is working fine, even if it might not be the most
>> optimal or streamlined approach.
>>
>> Alain
>>
>>
>> On Sat, Jan 5, 2019 at 5:27 AM Alain Picard <pic...@castortech.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We are using push streams to process post-commit events. Those events
>>> originate from different data sources. At the moment we are processing
>>> those individually, but the overhead of having a transaction for each is
>>> too much. Quite often those events come in bursts following an upstream
>>> transaction/ change set.
>>>
>>> The goal is to group events by data source and batch them, i.e. wait a
>>> bit when an event arrives to see if others are also coming. If they keep
>>> coming, keep collecting a bit longer, o/w move on.
>>>
>>> I see that the PushStream has methods coalesce and window. Window seems
>>> a bit more appropriate here, as it offers both duration and maxEvents. But
>>> it seems to operate all the time, and not start a batch upon receiving an
>>> event, which doesn't sound optimal in this case. More concerning to me is
>>> the comment regarding back-pressure. We can't use back pressure (no control
>>> on producer which is implemented via whiteboard. So here the maxEvents is
>>> more a way to limit the batch and not to indicate need for back pressure.
>>>
>>> Still, that doesn't address grouping. See that there is a fork, but that
>>> is made to deal with a fixed number of child streams.
>>>
>>> Would I just be best to use a window with just duration, collect a
>>> number of events, then move on and use a regular stream to group them and
>>> if necessary batch them in smaller groups?
>>>
>>> Cheers,
>>>
>>> Alain
>>>
>>>
>> _______________________________________________
>> OSGi Developer Mail 
>> listosgi-...@mail.osgi.orghttps://mail.osgi.org/mailman/listinfo/osgi-dev
>>
>>
>> --
>> Jürgen Albert
>> Geschäftsführer
>>
>> Data In Motion Consulting GmbH
>>
>> Kahlaische Str. 4
>> 07745 Jena
>>
>> Mobil:  0157-72521634
>> E-Mail: j.alb...@datainmotion.de
>> Web: www.datainmotion.de
>>
>> XING:   https://www.xing.com/profile/Juergen_Albert5
>>
>> Rechtliches
>>
>> Jena HBR 513025
>>
>> _______________________________________________
>> OSGi Developer Mail List
>> osgi-dev@mail.osgi.org
>> https://mail.osgi.org/mailman/listinfo/osgi-dev
>
> _______________________________________________
> OSGi Developer Mail List
> osgi-dev@mail.osgi.org
> https://mail.osgi.org/mailman/listinfo/osgi-dev
>
>
>
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to