Hi Alain,
the issue has a couple of reasons:
The Pushstream and eventsource have by default 2 queues with a size of
32 each. The default pushback policy is linear. This means, that when 10
events are in the buffer a pushback of 10 ms will be given to the
eventsource. This means, that the eventsource will wait this time,
before it sends the next event downstream. This default behaviour can
cause long processing times, especially when a lot of events are written
in a for loop. This means that the queues fill up very quick even if the
actual processing time of the code of yours is close to 0. Use e.g. the
ON_FULL_FIXED policy to get ride of this problem.
As far as I understand the bouncer, it waits a second, before it returns
a list, if no new events are coming. Thus a sleep time of less then a
second or even 1,5 (together with the pushback thing I described before)
will keep the stream busy longer then your sleep time for a batch. Thus
all the batches return when hitting the max size, except the last one.
This waits and for some threading reasons, the last deferred is blocked
from resolving, which in turn blocks eventsource close. If you add a
small wait before the close is called everything is fine.
The blocking issue is interesting non the less, but my experience is
that these kind of tests are often harsher then reality.
Regards,
Jürgen.
Am 05/02/2019 um 23:58 schrieb Alain Picard:
Tim,
Finally got around to this debouncer, and I tested to change the sleep
time. When I set it to like 800 to 1500, it never completes after
shoing "Closing the Generator". At 500, I get a Queue full that I can
understand. So why the hang?
Alain
On Mon, Jan 7, 2019 at 8:11 AM Tim Ward <tim.w...@paremus.com
<mailto:tim.w...@paremus.com>> wrote:
This use case is effectively a “debouncing” behaviour, which is
possible to implement with a little thought.
There are a couple of ways to attempt it. This one uses the
asyncMap operation to asynchronously gather the events until it
either times out the promise or it hits the maximum stream size.
Note that you have to filter out the “empty” lists that are used
to resolve the promises which are being aggregated into the
window. The result of this is a window which starts on the first
event arrival and then buffers the events for a while. The next
window isn’t started until the next event
Best Regards,
Tim
@Test
public void testWindow2() throws InvocationTargetException,
InterruptedException {
PushStreamProvider psp = new PushStreamProvider();
SimplePushEventSource<Integer> sps =
psp.createSimpleEventSource(Integer.class);
Promise<List<Collection<Integer>>> result = psp.createStream(sps)
.asyncMap(2, 0, new StreamDebouncer<Integer>(
new PromiseFactory(PromiseFactory.inlineExecutor()),
10, Duration.ofSeconds(1)))
.filter(c -> !c.isEmpty())
.collect(Collectors.toList());
new Thread(() -> {
for (int i = 0; i < 200;) {
for (int j = 0; j < 23; j++) {
sps.publish(i++);
}
try {
System.out.println("Burst finished, now at " + i);
Thread.sleep(2000);
} catch (InterruptedException e) {
sps.error(e);
break;
}
}
System.out.println("Closing generator");
sps.close();
}).start();
System.out.println(result.getValue().toString());
}
public static class StreamDebouncer<T> implements Function<T,
Promise<? extends Collection<T>>> {
private final PromiseFactory promiseFactory;
privatefinalintmaxSize;
private final Duration maxTime;
private final Object lock = new Object();
privateList<T> currentWindow;
private Deferred<Collection<T>> currentDeferred;
public StreamDebouncer(PromiseFactory promiseFactory, int maxSize,
Duration maxTime) {
this.promiseFactory= promiseFactory;
this.maxSize = maxSize;
this.maxTime = maxTime;
}
@Override
public Promise<Collection<T>> apply(T t) throws Exception {
Deferred<Collection<T>> deferred = null;
Collection<T> list = null;
booleanhitMaxSize= false;
synchronized(lock) {
if(currentWindow== null) {
currentWindow = new ArrayList<>(maxSize);
currentDeferred= promiseFactory.deferred();
deferred= currentDeferred;
list= currentWindow;
}
currentWindow.add(t);
if(currentWindow.size() == maxSize) {
hitMaxSize= true;
deferred= currentDeferred;
currentDeferred= null;
list= currentWindow;
currentWindow= null;
}
}
if(deferred != null) {
if(hitMaxSize) {
// We must resolve this way round to avoid racing
// the timeout and ending up with empty lists in
// all the promises
deferred.resolve(Collections.emptyList());
return promiseFactory.resolved(list);
} else {
final Collection<T> finalList = list;
return deferred.getPromise()
.timeout(maxTime.toMillis())
.recover(x -> {
synchronized (lock) {
if(currentWindow == finalList) {
currentWindow = null;
currentDeferred= null;
return finalList;
}
}
return Collections.emptyList();
});
}
} else {
return promiseFactory.resolved(Collections.emptyList());
}
}
}
On 7 Jan 2019, at 11:25, Alain Picard via osgi-dev
<osgi-dev@mail.osgi.org <mailto:osgi-dev@mail.osgi.org>> wrote:
Thanks Jürgen,
As I said that is what I went with and it is very satisfying
right now. As for pushback, I'm out of luck since the producer is
fully "disjoint" with a whiteboard pattern, but we have
configured ourselves with appropriated buffering But I'll keep
this code in mind, as we will surely have other patterns to
support as we use push streams more and more.
Alain
On Mon, Jan 7, 2019 at 5:55 AM Jürgen Albert via osgi-dev
<osgi-dev@mail.osgi.org <mailto:osgi-dev@mail.osgi.org>> wrote:
Hi Alain,
windowing would be your goto method. AFAIK there is not way
to extend a window, if you expect more messages to arrive.
Thus you would need to live with multiple batches, in case of
a prolonged burst. back pressure however is possible, even if
you use a buffer and/or windowing. The solution would look
like this:
psp.createPushStreamBuilder()
.withPushbackPolicy( q -> {
return Math.max(0, q.size() - 650);
})
.withQueuePolicy(QueuePolicyOption.BLOCK)
.withBuffer(new
ArrayBlockingQueue<PushEvent<? extends EObject>>(1200))
.build();
This PuschbackPolicy looks at the queue size and gradually
increases the pushback starting with one on the 651st element.
The grouping is another topic. The split method can do your
grouping, if you know what groups to expect. It essentially
returns an array of pushstreams correlating to each predicate
you give it. For everything else, you would need to do the
grouping for every batch you get with the usual stream methods.
Regards,
Jürgen.
Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev:
For now I went with my simple solution of using a window
with just duration, and that is working fine, even if it
might not be the most optimal or streamlined approach.
Alain
On Sat, Jan 5, 2019 at 5:27 AM Alain Picard
<pic...@castortech.com <mailto:pic...@castortech.com>> wrote:
Hi,
We are using push streams to process post-commit events.
Those events originate from different data sources. At
the moment we are processing those individually, but the
overhead of having a transaction for each is too much.
Quite often those events come in bursts following an
upstream transaction/ change set.
The goal is to group events by data source and batch
them, i.e. wait a bit when an event arrives to see if
others are also coming. If they keep coming, keep
collecting a bit longer, o/w move on.
I see that the PushStream has methods coalesce and
window. Window seems a bit more appropriate here, as it
offers both duration and maxEvents. But it seems to
operate all the time, and not start a batch upon
receiving an event, which doesn't sound optimal in this
case. More concerning to me is the comment regarding
back-pressure. We can't use back pressure (no control on
producer which is implemented via whiteboard. So here
the maxEvents is more a way to limit the batch and not
to indicate need for back pressure.
Still, that doesn't address grouping. See that there is
a fork, but that is made to deal with a fixed number of
child streams.
Would I just be best to use a window with just duration,
collect a number of events, then move on and use a
regular stream to group them and if necessary batch them
in smaller groups?
Cheers,
Alain
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org <mailto:osgi-dev@mail.osgi.org>
https://mail.osgi.org/mailman/listinfo/osgi-dev
--
Jürgen Albert
Geschäftsführer
Data In Motion Consulting GmbH
Kahlaische Str. 4
07745 Jena
Mobil: 0157-72521634
E-Mail:j.alb...@datainmotion.de <mailto:j.alb...@datainmotion.de>
Web:www.datainmotion.de <http://www.datainmotion.de/>
XING:https://www.xing.com/profile/Juergen_Albert5
Rechtliches
Jena HBR 513025
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org <mailto:osgi-dev@mail.osgi.org>
https://mail.osgi.org/mailman/listinfo/osgi-dev
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org <mailto:osgi-dev@mail.osgi.org>
https://mail.osgi.org/mailman/listinfo/osgi-dev
--
Jürgen Albert
Geschäftsführer
Data In Motion Consulting GmbH
Kahlaische Str. 4
07745 Jena
Mobil: 0157-72521634
E-Mail: j.alb...@datainmotion.de
Web: www.datainmotion.de
XING: https://www.xing.com/profile/Juergen_Albert5
Rechtliches
Jena HBR 513025
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev