Jurgen, Thanks for the clarifications. I also did about the same change to match our standard stream implementation, and didn't get the issue, but I was too much in the dark to make any meaningful conclusions.
I was looking to modify the test to verify various scenarios. So I guess I'll make those changes and proceed from there. Thanks Alain On Wed, Feb 6, 2019 at 9:21 AM Jürgen Albert <j.alb...@data-in-motion.biz> wrote: > Creating the PushStream with an exclusive Executor fixes the Problem. > > Promise<List<Collection<Integer>>> result = psp.buildStream(sps) > .withBuffer(new ArrayBlockingQueue<PushEvent<? extends > Integer>>(32)) > .withExecutor(Executors.newCachedThreadPool()).build() > .asyncMap(5, 0, new StreamDebouncer<Integer>( > new > PromiseFactory(PromiseFactory.inlineExecutor()), > 10, Duration.ofSeconds(1))) > .filter(c -> !c.isEmpty()) > .collect(Collectors.toList()); > > Am 06/02/2019 um 15:14 schrieb Jürgen Albert: > > Hi Alain, > > the issue has a couple of reasons: > > The Pushstream and eventsource have by default 2 queues with a size of 32 > each. The default pushback policy is linear. This means, that when 10 > events are in the buffer a pushback of 10 ms will be given to the > eventsource. This means, that the eventsource will wait this time, before > it sends the next event downstream. This default behaviour can cause long > processing times, especially when a lot of events are written in a for > loop. This means that the queues fill up very quick even if the actual > processing time of the code of yours is close to 0. Use e.g. the > ON_FULL_FIXED policy to get ride of this problem. > > As far as I understand the bouncer, it waits a second, before it returns a > list, if no new events are coming. Thus a sleep time of less then a second > or even 1,5 (together with the pushback thing I described before) will keep > the stream busy longer then your sleep time for a batch. Thus all the > batches return when hitting the max size, except the last one. This waits > and for some threading reasons, the last deferred is blocked from > resolving, which in turn blocks eventsource close. If you add a small wait > before the close is called everything is fine. > > The blocking issue is interesting non the less, but my experience is that > these kind of tests are often harsher then reality. > > Regards, > > Jürgen. > > Am 05/02/2019 um 23:58 schrieb Alain Picard: > > Tim, > > Finally got around to this debouncer, and I tested to change the sleep > time. When I set it to like 800 to 1500, it never completes after shoing > "Closing the Generator". At 500, I get a Queue full that I can understand. > So why the hang? > > Alain > > > > On Mon, Jan 7, 2019 at 8:11 AM Tim Ward <tim.w...@paremus.com> wrote: > >> This use case is effectively a “debouncing” behaviour, which is possible >> to implement with a little thought. >> >> There are a couple of ways to attempt it. This one uses the asyncMap >> operation to asynchronously gather the events until it either times out the >> promise or it hits the maximum stream size. Note that you have to filter >> out the “empty” lists that are used to resolve the promises which are being >> aggregated into the window. The result of this is a window which starts on >> the first event arrival and then buffers the events for a while. The next >> window isn’t started until the next event >> >> >> Best Regards, >> >> Tim >> >> >> @Test >> public void testWindow2() throws InvocationTargetException, >> InterruptedException { >> >> PushStreamProvider psp = new PushStreamProvider(); >> >> SimplePushEventSource<Integer> sps = psp.createSimpleEventSource(Integer. >> class); >> >> Promise<List<Collection<Integer>>> result = psp.createStream(sps) >> .asyncMap(2, 0, new StreamDebouncer<Integer>( >> new PromiseFactory(PromiseFactory.inlineExecutor()), >> 10, Duration.ofSeconds(1))) >> .filter(c -> !c.isEmpty()) >> .collect(Collectors.toList()); >> >> new Thread(() -> { >> >> for (int i = 0; i < 200;) { >> >> for (int j = 0; j < 23; j++) { >> sps.publish(i++); >> } >> >> try { >> System.out.println("Burst finished, now at " + i); >> Thread.sleep(2000); >> } catch (InterruptedException e) { >> sps.error(e); >> break; >> } >> } >> >> System.out.println("Closing generator"); >> sps.close(); >> >> }).start(); >> >> System.out.println(result.getValue().toString()); >> >> } >> >> >> public static class StreamDebouncer<T> implements Function<T, Promise<? >> extends Collection<T>>> { >> >> private final PromiseFactory promiseFactory; >> private final int maxSize; >> private final Duration maxTime; >> >> >> private final Object lock = new Object(); >> >> >> private List<T> currentWindow; >> private Deferred<Collection<T>> currentDeferred; >> >> public StreamDebouncer(PromiseFactory promiseFactory, int maxSize, >> Duration maxTime) { >> this.promiseFactory = promiseFactory; >> this.maxSize = maxSize; >> this.maxTime = maxTime; >> } >> >> @Override >> public Promise<Collection<T>> apply(T t) throws Exception { >> >> >> Deferred<Collection<T>> deferred = null; >> Collection<T> list = null; >> boolean hitMaxSize = false; >> synchronized (lock) { >> if(currentWindow == null) { >> currentWindow = new ArrayList<>(maxSize); >> currentDeferred = promiseFactory.deferred(); >> deferred = currentDeferred; >> list = currentWindow; >> } >> currentWindow.add(t); >> if(currentWindow.size() == maxSize) { >> hitMaxSize = true; >> deferred = currentDeferred; >> currentDeferred = null; >> list = currentWindow; >> currentWindow = null; >> } >> } >> >> >> if(deferred != null) { >> if(hitMaxSize) { >> // We must resolve this way round to avoid racing >> // the timeout and ending up with empty lists in >> // all the promises >> deferred.resolve(Collections.emptyList()); >> return promiseFactory.resolved(list); >> } else { >> final Collection<T> finalList = list; >> return deferred.getPromise() >> .timeout(maxTime.toMillis()) >> .recover(x -> { >> synchronized (lock) { >> if(currentWindow == finalList) { >> currentWindow = null; >> currentDeferred = null; >> return finalList; >> } >> } >> return Collections.emptyList(); >> }); >> } >> } else { >> return promiseFactory.resolved(Collections.emptyList()); >> } >> } >> >> >> } >> >> >> >> On 7 Jan 2019, at 11:25, Alain Picard via osgi-dev < >> osgi-dev@mail.osgi.org> wrote: >> >> Thanks Jürgen, >> >> As I said that is what I went with and it is very satisfying right now. >> As for pushback, I'm out of luck since the producer is fully "disjoint" >> with a whiteboard pattern, but we have configured ourselves with >> appropriated buffering But I'll keep this code in mind, as we will surely >> have other patterns to support as we use push streams more and more. >> >> Alain >> >> >> On Mon, Jan 7, 2019 at 5:55 AM Jürgen Albert via osgi-dev < >> osgi-dev@mail.osgi.org> wrote: >> >>> Hi Alain, >>> >>> windowing would be your goto method. AFAIK there is not way to extend a >>> window, if you expect more messages to arrive. Thus you would need to live >>> with multiple batches, in case of a prolonged burst. back pressure however >>> is possible, even if you use a buffer and/or windowing. The solution would >>> look like this: >>> >>> psp.createPushStreamBuilder() >>> .withPushbackPolicy( q -> { >>> return Math.max(0, q.size() - 650); >>> }) >>> .withQueuePolicy(QueuePolicyOption.BLOCK) >>> .withBuffer(new ArrayBlockingQueue<PushEvent<? >>> extends EObject>>(1200)) >>> .build(); >>> >>> This PuschbackPolicy looks at the queue size and gradually increases the >>> pushback starting with one on the 651st element. >>> >>> The grouping is another topic. The split method can do your grouping, if >>> you know what groups to expect. It essentially returns an array of >>> pushstreams correlating to each predicate you give it. For everything else, >>> you would need to do the grouping for every batch you get with the usual >>> stream methods. >>> >>> Regards, >>> >>> Jürgen. >>> >>> Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev: >>> >>> For now I went with my simple solution of using a window with just >>> duration, and that is working fine, even if it might not be the most >>> optimal or streamlined approach. >>> >>> Alain >>> >>> >>> On Sat, Jan 5, 2019 at 5:27 AM Alain Picard <pic...@castortech.com> >>> wrote: >>> >>>> Hi, >>>> >>>> We are using push streams to process post-commit events. Those events >>>> originate from different data sources. At the moment we are processing >>>> those individually, but the overhead of having a transaction for each is >>>> too much. Quite often those events come in bursts following an upstream >>>> transaction/ change set. >>>> >>>> The goal is to group events by data source and batch them, i.e. wait a >>>> bit when an event arrives to see if others are also coming. If they keep >>>> coming, keep collecting a bit longer, o/w move on. >>>> >>>> I see that the PushStream has methods coalesce and window. Window seems >>>> a bit more appropriate here, as it offers both duration and maxEvents. But >>>> it seems to operate all the time, and not start a batch upon receiving an >>>> event, which doesn't sound optimal in this case. More concerning to me is >>>> the comment regarding back-pressure. We can't use back pressure (no control >>>> on producer which is implemented via whiteboard. So here the maxEvents is >>>> more a way to limit the batch and not to indicate need for back pressure. >>>> >>>> Still, that doesn't address grouping. See that there is a fork, but >>>> that is made to deal with a fixed number of child streams. >>>> >>>> Would I just be best to use a window with just duration, collect a >>>> number of events, then move on and use a regular stream to group them and >>>> if necessary batch them in smaller groups? >>>> >>>> Cheers, >>>> >>>> Alain >>>> >>>> >>> _______________________________________________ >>> OSGi Developer Mail >>> listosgi-...@mail.osgi.orghttps://mail.osgi.org/mailman/listinfo/osgi-dev >>> >>> >>> -- >>> Jürgen Albert >>> Geschäftsführer >>> >>> Data In Motion Consulting GmbH >>> >>> Kahlaische Str. 4 >>> 07745 Jena >>> >>> Mobil: 0157-72521634 >>> E-Mail: j.alb...@datainmotion.de >>> Web: www.datainmotion.de >>> >>> XING: https://www.xing.com/profile/Juergen_Albert5 >>> >>> Rechtliches >>> >>> Jena HBR 513025 >>> >>> _______________________________________________ >>> OSGi Developer Mail List >>> osgi-dev@mail.osgi.org >>> https://mail.osgi.org/mailman/listinfo/osgi-dev >> >> _______________________________________________ >> OSGi Developer Mail List >> osgi-dev@mail.osgi.org >> https://mail.osgi.org/mailman/listinfo/osgi-dev >> >> >> > -- > Jürgen Albert > Geschäftsführer > > Data In Motion Consulting GmbH > > Kahlaische Str. 4 > 07745 Jena > > Mobil: 0157-72521634 > E-Mail: j.alb...@datainmotion.de > Web: www.datainmotion.de > > XING: https://www.xing.com/profile/Juergen_Albert5 > > Rechtliches > > Jena HBR 513025 > > > -- > Jürgen Albert > Geschäftsführer > > Data In Motion Consulting GmbH > > Kahlaische Str. 4 > 07745 Jena > > Mobil: 0157-72521634 > E-Mail: j.alb...@datainmotion.de > Web: www.datainmotion.de > > XING: https://www.xing.com/profile/Juergen_Albert5 > > Rechtliches > > Jena HBR 513025 > >
_______________________________________________ OSGi Developer Mail List osgi-dev@mail.osgi.org https://mail.osgi.org/mailman/listinfo/osgi-dev