Hi Todor,

The usage of merge is part of the “intermediate” pipeline building stage of 
creating a PushStream and therefore not suitable for what you are trying to do. 

The correct way to use merge would be:

————
PushStream<Integer> ps = streams.buildStream(spes)
        .unbuffered()
        .build()
        .merge(streamA)
        .merge(streamB)
        .merge(streamC)
        .forEach(this::doSomething);

————

Your example does three major things wrong. 

Firstly, it calls merge multiple times on the same pipeline stage. I would 
expect this to throw an IllegalStateException as intermediate operations are 
supposed to be chained one after the other, not repeatedly called on the same 
object.

Secondly, it attempts to asynchronously add intermediate operations into the 
pipeline. This is not permitted - once a terminal operation is called the 
stream structure is fixed in place to ensure sane thread behaviour, any late 
arriving merge operations could never be included.

Thirdly, it assumes that the result of calling an intermediate operation will 
be the same object as the one you call the intermediate operation on. This is 
not the case for the RI, nor would I expect it to be in other implementations. 
You should always expect to use chained method calls.

If you did want to use merge to solve your problem you would need to gather the 
streams into a list and iterate over it to set up the child pipeline using the 
standard form. This, however, would not be asynchronous with respect to the 
production of the child streams. 

In general I do not think that merge is a good fit for your use case. If a 
stream were to support your use case natively (i.e. without writing a custom 
consumer) it would be something more akin to an interleavedFlatMap() which 
guarantees relative ordering of the flatMapped streams, but not that the full 
contents of sub stream A are processed before the first event of sub stream B 
is processed.

Best Regards,

Tim

Sent from my iPhone

> On 20 Mar 2019, at 12:31, Boev, Todor via osgi-dev <osgi-dev@mail.osgi.org> 
> wrote:
> 
> Both solutions worked.
>  
> I tried to replace the nested callback code with PushStream.merge():
>  
>    public static <T> PushStream<T> flatten(PushStream<PushStream<T>> nested) {
>         @SuppressWarnings("unchecked")
>         SimplePushEventSource<T> spes = (SimplePushEventSource<T>) 
> streams.createSimpleEventSource(Object.class);
>  
>         PushStream<T> merged = streams.buildStream(spes).unbuffered().build();
>         // As nested streams are pulled out attach them to the merged stream
>         nested.forEach(merged::merge);
>  
>         return merged;
>    }
>  
> It did not work. The initial list source is opened and none of the nested 
> timer sources get open.
> Why is that?
> -----------------------------------
> Todor Boev
> OSGi Platform
> Software AG
>  
> From: Tim Ward [mailto:tim.w...@paremus.com] 
> Sent: Tuesday, March 19, 2019 8:44 PM
> To: Boev, Todor; OSGi Developer Mail List
> Subject: Re: [osgi-dev] Flatten a PushStream<PushStream<T>>
>  
> Hi Todor,
> 
> Sent from my iPhone
> 
> On 19 Mar 2019, at 15:45, Boev, Todor via osgi-dev <osgi-dev@mail.osgi.org> 
> wrote:
> 
> Hello,
>  
> Is it possible to have a push stream of push streams?
> PushStream<PushStream<T>>
>  
> Yes, this would be an asynchronous year arriving set of PushStream instances. 
> It’s pretty common when a mapping operation turns one event into multiple 
> events. 
> 
> 
>  
> And if so how can I flatten it to a PushStream<T>?
>  
> The flatMap operation is your friend here, and possibly what you needed to 
> use when you ended up with a PushStream<PushStream<T>>. See 
> https://osgi.org/javadoc/osgi.cmpn/7.0.0/org/osgi/util/pushstream/PushStream.html#flatMap-org.osgi.util.function.Function-
>  
> Here you should not have used map as the first operation, but flatMap, then 
> the final operation  would be unnecessary.
>  
>  
> 
> 
>  
> The example I am trying looks something like this:
>  
>     private static PushStreamProvider streams = new PushStreamProvider();
>  
>     public static void main(String[] args) throws InterruptedException {
>         // Emit 1,2,3 asynchronously and close
>         PushStream<Integer> xs = streams.createStream(list(1 ,2, 3));
>         // Emit un-connected streams each of which once connected will emit a 
> number every X seconds
>         PushStream<PushStream<Integer>> yss = xs.map(x -> 
> streams.createStream(time(x)).map(ignore -> x));
>         // Flatten all time events into one stream
>         PushStream<Integer> zs = yss.flatMap(ts -> ts);
>        
>         // This should connect all 3 timer streams and print the output for 
> 15 sec
>         // Instead only the first timer stream is connected
>         zs.forEach(i -> print(i));
>         Thread.sleep(SECONDS.toMillis(15));
>         zs.close();
>  
> The reason for this is that by default PushStreams consume events serially 
> and in order (which is mostly what people want). Therefore the output of the 
> flatMap is trying to consume the whole of the stream created by the first 
> event before moving on to the next. The important thing to realise is that 
> the flatMap expects the stream that you map to not to be infinite, or if it 
> is infinite that you limit it in some way. 
>  
> To get the behaviour you want you either need to:
>  
> • up the parallelism of your stream (requiring you to know in advance how 
> many input events there will be). Ordering doesn’t matter to you so this 
> would have few other effects. It should not be necessary to add more threads.
>  
>  
> • actually consume and republish the events from the generated streams rather 
> than flatmapping. For example:
>  
> —————
>  
> PushStream<Integer> xs = streams.createStream(list(1 ,2, 3));
>  // Emit un-connected streams each of which once connected will emit a number 
> every X seconds
>         PushStream<PushStream<Integer>> yss = xs.map(x -> 
> streams.createStream(time(x)).map(ignore -> x));
>  
> SimplePushEventSource<Integer> spes = streams.createSimplePushEventSource();
>  
> Pushstream<Integer> mergedInfiniteStreams = 
> streams.buildStream().unbuffered().build();
>  
> vss.forEachEvent(pe -> {
>  
> switch(pe.getType()) {
>     case DATA:
>         pe.getData().forEachEvent(pe2 -> {
>           switch(pe2.getType()) {
>               case DATA:
>                   spes.publish(pe2.getData());
>                   // shut down producers if nobody
>                   // is listening anymore.
>                   return spes.isConnected() ? 0 : -1;
>               case ERROR:
>                   spes.error(pe2.getFailure());
>                   // Errors are bad, close the parent
>                   yss.close();
>                   return -1;
>               case CLOSE:
>                   // No end of stream as there may
>                   // be others connected
>                   return -1;
>           }
>         });
>         return 0;
>     case ERROR:
>          // Everyone goes bang
>          spes.error(pe.getFailure());
>          return -1;
>     case CLOSE:
>          // This doesn’t wait for the children to
>          // complete, which is good as they don’t!
>          spes.endOfStream();
>          return -1;
>   }
> });
>  
> Promise<Long> p = mergedInfiniteStreams.forEach(i -> print(i));
> Thread.sleep(SECONDS.toMillis(15));
>  zs.close();
> 
> 
> // Don’t miss the end of the stream when exiting
> p.getFailure()
>  
> —————
>  
> This is a basic example which adds a single buffer. If you don’t want a 
> buffer then you can create your own PushEventSource which pipes the events 
> and keeps track of its close status (it would be pretty trivial). This would 
> also allow you to have back pressure sent to the producer threads. Left as an 
> exercise for the reader as I’m on vacation in Tenerife right now!
>  
> All the best, 
>  
> Tim.
> 
> 
>     }
>  
>     // Starts a thread that emits an event every pause seconds
>     public static PushEventSource<Integer> time(int pause) { … }
>  
>     // Starts a thread that emits the items on the list and closes the stream
>     public static <T> PushEventSource<T> list(T... list) {…}
>  
> Regards
> -----------------------------------
> Todor Boev
> OSGi Platform
> Software AG
>  
>  
> _______________________________________________
> OSGi Developer Mail List
> osgi-dev@mail.osgi.org
> https://mail.osgi.org/mailman/listinfo/osgi-dev
> _______________________________________________
> OSGi Developer Mail List
> osgi-dev@mail.osgi.org
> https://mail.osgi.org/mailman/listinfo/osgi-dev
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to