I wrote the whole of this myself this morning - I tend to do examples as unit 
tests because they’re easy for people to try out and run. 

I’m not aware of a substantial PushStream teat bucket other than the OSGi 
Compliance Test Suite, which is available to members. 

If you want to start an open source PushStream implementation project then I’m 
sure it could live alongside the Promises implementation in Apache Aries!

Tim. 

Sent from my iPhone

> On 7 Jan 2019, at 14:01, Alain Picard <pic...@castortech.com> wrote:
> 
> Tim,
> 
> Thanks, will review an apply.
> 
> BTW, this seems to come from some of the tests, and I've been looking where 
> tests are located, as this is often very revealing in how various aspects 
> actually work and I have not been able to find them. Are they on Github 
> somewhere?
> 
> Alain
> 
> 
>> On Mon, Jan 7, 2019 at 8:11 AM Tim Ward <tim.w...@paremus.com> wrote:
>> This use case is effectively a “debouncing” behaviour, which is possible to 
>> implement with a little thought.
>> 
>> There are a couple of ways to attempt it. This one uses the asyncMap 
>> operation to asynchronously gather the events until it either times out the 
>> promise or it hits the maximum stream size. Note that you have to filter out 
>> the “empty” lists that are used to resolve the promises which are being 
>> aggregated into the window. The result of this is a window which starts on 
>> the first event arrival and then buffers the events for a while. The next 
>> window isn’t started until the next event
>> 
>> 
>> Best Regards,
>> 
>> Tim
>> 
>> 
>>      @Test
>>      public void testWindow2() throws InvocationTargetException, 
>> InterruptedException {
>> 
>>              PushStreamProvider psp = new PushStreamProvider();
>> 
>>              SimplePushEventSource<Integer> sps = 
>> psp.createSimpleEventSource(Integer.class);
>> 
>>              Promise<List<Collection<Integer>>> result = 
>> psp.createStream(sps)
>>                              .asyncMap(2, 0, new StreamDebouncer<Integer>(
>>                                              new 
>> PromiseFactory(PromiseFactory.inlineExecutor()), 
>>                                              10, Duration.ofSeconds(1)))
>>                              .filter(c -> !c.isEmpty())
>>                              .collect(Collectors.toList());
>> 
>>              new Thread(() -> {
>> 
>>                      for (int i = 0; i < 200;) {
>> 
>>                              for (int j = 0; j < 23; j++) {
>>                                      sps.publish(i++);
>>                              }
>> 
>>                              try {
>>                                      System.out.println("Burst finished, now 
>> at " + i);
>>                                      Thread.sleep(2000);
>>                              } catch (InterruptedException e) {
>>                                      sps.error(e);
>>                                      break;
>>                              }
>>                      }
>> 
>>                      System.out.println("Closing generator");
>>                      sps.close();
>> 
>>              }).start();
>> 
>>              System.out.println(result.getValue().toString());
>> 
>>      }
>>      
>>      public static class StreamDebouncer<T> implements Function<T, Promise<? 
>> extends Collection<T>>> {
>> 
>>              private final PromiseFactory promiseFactory;
>>              private final int maxSize;
>>              private final Duration maxTime;
>>              
>>              private final Object lock = new Object();
>>              
>>              private List<T> currentWindow;
>>              private Deferred<Collection<T>> currentDeferred;
>> 
>>              public StreamDebouncer(PromiseFactory promiseFactory, int 
>> maxSize, Duration maxTime) {
>>                      this.promiseFactory = promiseFactory;
>>                      this.maxSize = maxSize;
>>                      this.maxTime = maxTime;
>>              }
>> 
>>              @Override
>>              public Promise<Collection<T>> apply(T t) throws Exception {
>>                      
>>                      Deferred<Collection<T>> deferred = null;
>>                      Collection<T> list = null;
>>                      boolean hitMaxSize = false;
>>                      synchronized (lock) {
>>                              if(currentWindow == null) {
>>                                      currentWindow = new 
>> ArrayList<>(maxSize);
>>                                      currentDeferred = 
>> promiseFactory.deferred();
>>                                      deferred = currentDeferred;
>>                                      list = currentWindow;
>>                              }
>>                              currentWindow.add(t);
>>                              if(currentWindow.size() == maxSize) {
>>                                      hitMaxSize = true;
>>                                      deferred = currentDeferred;
>>                                      currentDeferred = null;
>>                                      list = currentWindow;
>>                                      currentWindow = null;
>>                              }
>>                      }
>>                      
>>                      if(deferred != null) {
>>                              if(hitMaxSize) {
>>                                      // We must resolve this way round to 
>> avoid racing
>>                                      // the timeout and ending up with empty 
>> lists in
>>                                      // all the promises
>>                                      
>> deferred.resolve(Collections.emptyList());
>>                                      return promiseFactory.resolved(list);
>>                              } else {
>>                                      final Collection<T> finalList = list;
>>                                      return deferred.getPromise()
>>                                                      
>> .timeout(maxTime.toMillis())
>>                                                      .recover(x -> {
>>                                                              synchronized 
>> (lock) {
>>                                                                      
>> if(currentWindow == finalList) {
>>                                                                              
>> currentWindow = null;
>>                                                                              
>> currentDeferred = null;
>>                                                                              
>> return finalList;
>>                                                                      }
>>                                                              }
>>                                                              return 
>> Collections.emptyList();
>>                                                      });
>>                              }
>>                      } else {
>>                              return 
>> promiseFactory.resolved(Collections.emptyList());
>>                      }
>>              }
>>              
>>      }
>> 
>> 
>> 
>>> On 7 Jan 2019, at 11:25, Alain Picard via osgi-dev <osgi-dev@mail.osgi.org> 
>>> wrote:
>>> 
>>> Thanks  Jürgen,
>>> 
>>> As I said that is what I went with and it is very satisfying right now. As 
>>> for pushback, I'm out of luck since the producer is fully "disjoint" with a 
>>> whiteboard pattern, but we have configured ourselves with appropriated 
>>> buffering But I'll keep this code in mind, as we will surely have other 
>>> patterns to support as we use push streams more and more.
>>> 
>>> Alain
>>> 
>>> 
>>>> On Mon, Jan 7, 2019 at 5:55 AM Jürgen Albert via osgi-dev 
>>>> <osgi-dev@mail.osgi.org> wrote:
>>>> Hi Alain,
>>>> 
>>>> windowing would be your goto method. AFAIK there is not way to extend a 
>>>> window, if you expect more messages to arrive. Thus you would need to live 
>>>> with multiple batches, in case of a prolonged burst. back pressure however 
>>>> is possible, even if you use a buffer and/or windowing.  The solution 
>>>> would look like this:
>>>> 
>>>> psp.createPushStreamBuilder()
>>>>                         .withPushbackPolicy( q -> {
>>>>                             return Math.max(0, q.size() - 650);
>>>>                         })
>>>>                         .withQueuePolicy(QueuePolicyOption.BLOCK)
>>>>                         .withBuffer(new ArrayBlockingQueue<PushEvent<? 
>>>> extends EObject>>(1200))
>>>>                         .build();
>>>> 
>>>> This PuschbackPolicy looks at the queue size and gradually increases the 
>>>> pushback starting with one on the 651st element.
>>>> 
>>>> The grouping is another topic. The split method can do your grouping, if 
>>>> you know what groups to expect. It essentially  returns an array of 
>>>> pushstreams correlating to each predicate you give it. For everything 
>>>> else, you would need to do the grouping for every batch you get with the 
>>>> usual stream methods.
>>>> 
>>>> Regards,
>>>> 
>>>> Jürgen.
>>>> 
>>>>> Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev:
>>>>> For now I went with my simple solution of using a window with just 
>>>>> duration, and that is working fine, even if it might not be the most 
>>>>> optimal or streamlined approach.
>>>>> 
>>>>> Alain
>>>>> 
>>>>> 
>>>>>> On Sat, Jan 5, 2019 at 5:27 AM Alain Picard <pic...@castortech.com> 
>>>>>> wrote:
>>>>>> Hi,
>>>>>> 
>>>>>> We are using push streams to process post-commit events. Those events 
>>>>>> originate from different data sources. At the moment we are processing 
>>>>>> those individually, but the overhead of having a transaction for each is 
>>>>>> too much. Quite often those events come in bursts following an upstream 
>>>>>> transaction/ change set.
>>>>>> 
>>>>>> The goal is to group events by data source and batch them, i.e. wait a 
>>>>>> bit when an event arrives to see if others are also coming. If they keep 
>>>>>> coming, keep collecting a bit longer, o/w move on.
>>>>>> 
>>>>>> I see that the PushStream has methods coalesce and window. Window seems 
>>>>>> a bit more appropriate here, as it offers both duration and maxEvents. 
>>>>>> But it seems to operate all the time, and not start a batch upon 
>>>>>> receiving an event, which doesn't sound optimal in this case. More 
>>>>>> concerning to me is the comment regarding back-pressure. We can't use 
>>>>>> back pressure (no control on producer which is implemented via 
>>>>>> whiteboard. So here the maxEvents is more a way to limit the batch and 
>>>>>> not to indicate need for back pressure.
>>>>>> 
>>>>>> Still, that doesn't address grouping. See that there is a fork, but that 
>>>>>> is made to deal with a fixed number of child streams.
>>>>>> 
>>>>>> Would I just be best to use a window with just duration, collect a 
>>>>>> number of events, then move on and use a regular stream to group them 
>>>>>> and if necessary batch them in smaller groups?
>>>>>> 
>>>>>> Cheers,
>>>>>> 
>>>>>> Alain
>>>>>> 
>>>>> 
>>>>> 
>>>>> _______________________________________________
>>>>> OSGi Developer Mail List
>>>>> osgi-dev@mail.osgi.org
>>>>> https://mail.osgi.org/mailman/listinfo/osgi-dev
>>>> 
>>>> -- 
>>>> Jürgen Albert
>>>> Geschäftsführer
>>>> 
>>>> Data In Motion Consulting GmbH
>>>> 
>>>> Kahlaische Str. 4
>>>> 07745 Jena
>>>> 
>>>> Mobil:  0157-72521634
>>>> E-Mail: j.alb...@datainmotion.de
>>>> Web: www.datainmotion.de
>>>> 
>>>> XING:   https://www.xing.com/profile/Juergen_Albert5
>>>> 
>>>> Rechtliches
>>>> 
>>>> Jena HBR 513025
>>>> _______________________________________________
>>>> OSGi Developer Mail List
>>>> osgi-dev@mail.osgi.org
>>>> https://mail.osgi.org/mailman/listinfo/osgi-dev
>>> _______________________________________________
>>> OSGi Developer Mail List
>>> osgi-dev@mail.osgi.org
>>> https://mail.osgi.org/mailman/listinfo/osgi-dev
>> 
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to