Tim, Finally got around to this debouncer, and I tested to change the sleep time. When I set it to like 800 to 1500, it never completes after shoing "Closing the Generator". At 500, I get a Queue full that I can understand. So why the hang?
Alain On Mon, Jan 7, 2019 at 8:11 AM Tim Ward <tim.w...@paremus.com> wrote: > This use case is effectively a “debouncing” behaviour, which is possible > to implement with a little thought. > > There are a couple of ways to attempt it. This one uses the asyncMap > operation to asynchronously gather the events until it either times out the > promise or it hits the maximum stream size. Note that you have to filter > out the “empty” lists that are used to resolve the promises which are being > aggregated into the window. The result of this is a window which starts on > the first event arrival and then buffers the events for a while. The next > window isn’t started until the next event > > > Best Regards, > > Tim > > > @Test > public void testWindow2() throws InvocationTargetException, > InterruptedException { > > PushStreamProvider psp = new PushStreamProvider(); > > SimplePushEventSource<Integer> sps = psp.createSimpleEventSource(Integer. > class); > > Promise<List<Collection<Integer>>> result = psp.createStream(sps) > .asyncMap(2, 0, new StreamDebouncer<Integer>( > new PromiseFactory(PromiseFactory.inlineExecutor()), > 10, Duration.ofSeconds(1))) > .filter(c -> !c.isEmpty()) > .collect(Collectors.toList()); > > new Thread(() -> { > > for (int i = 0; i < 200;) { > > for (int j = 0; j < 23; j++) { > sps.publish(i++); > } > > try { > System.out.println("Burst finished, now at " + i); > Thread.sleep(2000); > } catch (InterruptedException e) { > sps.error(e); > break; > } > } > > System.out.println("Closing generator"); > sps.close(); > > }).start(); > > System.out.println(result.getValue().toString()); > > } > > > public static class StreamDebouncer<T> implements Function<T, Promise<? > extends Collection<T>>> { > > private final PromiseFactory promiseFactory; > private final int maxSize; > private final Duration maxTime; > > > private final Object lock = new Object(); > > > private List<T> currentWindow; > private Deferred<Collection<T>> currentDeferred; > > public StreamDebouncer(PromiseFactory promiseFactory, int maxSize, > Duration maxTime) { > this.promiseFactory = promiseFactory; > this.maxSize = maxSize; > this.maxTime = maxTime; > } > > @Override > public Promise<Collection<T>> apply(T t) throws Exception { > > > Deferred<Collection<T>> deferred = null; > Collection<T> list = null; > boolean hitMaxSize = false; > synchronized (lock) { > if(currentWindow == null) { > currentWindow = new ArrayList<>(maxSize); > currentDeferred = promiseFactory.deferred(); > deferred = currentDeferred; > list = currentWindow; > } > currentWindow.add(t); > if(currentWindow.size() == maxSize) { > hitMaxSize = true; > deferred = currentDeferred; > currentDeferred = null; > list = currentWindow; > currentWindow = null; > } > } > > > if(deferred != null) { > if(hitMaxSize) { > // We must resolve this way round to avoid racing > // the timeout and ending up with empty lists in > // all the promises > deferred.resolve(Collections.emptyList()); > return promiseFactory.resolved(list); > } else { > final Collection<T> finalList = list; > return deferred.getPromise() > .timeout(maxTime.toMillis()) > .recover(x -> { > synchronized (lock) { > if(currentWindow == finalList) { > currentWindow = null; > currentDeferred = null; > return finalList; > } > } > return Collections.emptyList(); > }); > } > } else { > return promiseFactory.resolved(Collections.emptyList()); > } > } > > > } > > > > On 7 Jan 2019, at 11:25, Alain Picard via osgi-dev <osgi-dev@mail.osgi.org> > wrote: > > Thanks Jürgen, > > As I said that is what I went with and it is very satisfying right now. As > for pushback, I'm out of luck since the producer is fully "disjoint" with a > whiteboard pattern, but we have configured ourselves with appropriated > buffering But I'll keep this code in mind, as we will surely have other > patterns to support as we use push streams more and more. > > Alain > > > On Mon, Jan 7, 2019 at 5:55 AM Jürgen Albert via osgi-dev < > osgi-dev@mail.osgi.org> wrote: > >> Hi Alain, >> >> windowing would be your goto method. AFAIK there is not way to extend a >> window, if you expect more messages to arrive. Thus you would need to live >> with multiple batches, in case of a prolonged burst. back pressure however >> is possible, even if you use a buffer and/or windowing. The solution would >> look like this: >> >> psp.createPushStreamBuilder() >> .withPushbackPolicy( q -> { >> return Math.max(0, q.size() - 650); >> }) >> .withQueuePolicy(QueuePolicyOption.BLOCK) >> .withBuffer(new ArrayBlockingQueue<PushEvent<? >> extends EObject>>(1200)) >> .build(); >> >> This PuschbackPolicy looks at the queue size and gradually increases the >> pushback starting with one on the 651st element. >> >> The grouping is another topic. The split method can do your grouping, if >> you know what groups to expect. It essentially returns an array of >> pushstreams correlating to each predicate you give it. For everything else, >> you would need to do the grouping for every batch you get with the usual >> stream methods. >> >> Regards, >> >> Jürgen. >> >> Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev: >> >> For now I went with my simple solution of using a window with just >> duration, and that is working fine, even if it might not be the most >> optimal or streamlined approach. >> >> Alain >> >> >> On Sat, Jan 5, 2019 at 5:27 AM Alain Picard <pic...@castortech.com> >> wrote: >> >>> Hi, >>> >>> We are using push streams to process post-commit events. Those events >>> originate from different data sources. At the moment we are processing >>> those individually, but the overhead of having a transaction for each is >>> too much. Quite often those events come in bursts following an upstream >>> transaction/ change set. >>> >>> The goal is to group events by data source and batch them, i.e. wait a >>> bit when an event arrives to see if others are also coming. If they keep >>> coming, keep collecting a bit longer, o/w move on. >>> >>> I see that the PushStream has methods coalesce and window. Window seems >>> a bit more appropriate here, as it offers both duration and maxEvents. But >>> it seems to operate all the time, and not start a batch upon receiving an >>> event, which doesn't sound optimal in this case. More concerning to me is >>> the comment regarding back-pressure. We can't use back pressure (no control >>> on producer which is implemented via whiteboard. So here the maxEvents is >>> more a way to limit the batch and not to indicate need for back pressure. >>> >>> Still, that doesn't address grouping. See that there is a fork, but that >>> is made to deal with a fixed number of child streams. >>> >>> Would I just be best to use a window with just duration, collect a >>> number of events, then move on and use a regular stream to group them and >>> if necessary batch them in smaller groups? >>> >>> Cheers, >>> >>> Alain >>> >>> >> _______________________________________________ >> OSGi Developer Mail >> listosgi-...@mail.osgi.orghttps://mail.osgi.org/mailman/listinfo/osgi-dev >> >> >> -- >> Jürgen Albert >> Geschäftsführer >> >> Data In Motion Consulting GmbH >> >> Kahlaische Str. 4 >> 07745 Jena >> >> Mobil: 0157-72521634 >> E-Mail: j.alb...@datainmotion.de >> Web: www.datainmotion.de >> >> XING: https://www.xing.com/profile/Juergen_Albert5 >> >> Rechtliches >> >> Jena HBR 513025 >> >> _______________________________________________ >> OSGi Developer Mail List >> osgi-dev@mail.osgi.org >> https://mail.osgi.org/mailman/listinfo/osgi-dev > > _______________________________________________ > OSGi Developer Mail List > osgi-dev@mail.osgi.org > https://mail.osgi.org/mailman/listinfo/osgi-dev > > >
_______________________________________________ OSGi Developer Mail List osgi-dev@mail.osgi.org https://mail.osgi.org/mailman/listinfo/osgi-dev