Hi Todor,

Sent from my iPhone

> On 19 Mar 2019, at 15:45, Boev, Todor via osgi-dev <osgi-dev@mail.osgi.org> 
> wrote:
> 
> Hello,
>  
> Is it possible to have a push stream of push streams?
> PushStream<PushStream<T>>

Yes, this would be an asynchronous year arriving set of PushStream instances. 
It’s pretty common when a mapping operation turns one event into multiple 
events. 

>  
> And if so how can I flatten it to a PushStream<T>?

The flatMap operation is your friend here, and possibly what you needed to use 
when you ended up with a PushStream<PushStream<T>>. See 
https://osgi.org/javadoc/osgi.cmpn/7.0.0/org/osgi/util/pushstream/PushStream.html#flatMap-org.osgi.util.function.Function-

Here you should not have used map as the first operation, but flatMap, then the 
final operation  would be unnecessary.



>  
> The example I am trying looks something like this:
>  
>     private static PushStreamProvider streams = new PushStreamProvider();
>  
>     public static void main(String[] args) throws InterruptedException {
>         // Emit 1,2,3 asynchronously and close
>         PushStream<Integer> xs = streams.createStream(list(1 ,2, 3));
>         // Emit un-connected streams each of which once connected will emit a 
> number every X seconds
>         PushStream<PushStream<Integer>> yss = xs.map(x -> 
> streams.createStream(time(x)).map(ignore -> x));
>         // Flatten all time events into one stream
>         PushStream<Integer> zs = yss.flatMap(ts -> ts);
>        
>         // This should connect all 3 timer streams and print the output for 
> 15 sec
>         // Instead only the first timer stream is connected
>         zs.forEach(i -> print(i));
>         Thread.sleep(SECONDS.toMillis(15));
>         zs.close();

The reason for this is that by default PushStreams consume events serially and 
in order (which is mostly what people want). Therefore the output of the 
flatMap is trying to consume the whole of the stream created by the first event 
before moving on to the next. The important thing to realise is that the 
flatMap expects the stream that you map to not to be infinite, or if it is 
infinite that you limit it in some way. 

To get the behaviour you want you either need to:

• up the parallelism of your stream (requiring you to know in advance how many 
input events there will be). Ordering doesn’t matter to you so this would have 
few other effects. It should not be necessary to add more threads.


• actually consume and republish the events from the generated streams rather 
than flatmapping. For example:

—————

PushStream<Integer> xs = streams.createStream(list(1 ,2, 3));
 // Emit un-connected streams each of which once connected will emit a number 
every X seconds
        PushStream<PushStream<Integer>> yss = xs.map(x -> 
streams.createStream(time(x)).map(ignore -> x));

SimplePushEventSource<Integer> spes = streams.createSimplePushEventSource();

Pushstream<Integer> mergedInfiniteStreams = 
streams.buildStream().unbuffered().build();

vss.forEachEvent(pe -> {

switch(pe.getType()) {
    case DATA:
        pe.getData().forEachEvent(pe2 -> {
          switch(pe2.getType()) {
              case DATA:
                  spes.publish(pe2.getData());
                  // shut down producers if nobody
                  // is listening anymore.
                  return spes.isConnected() ? 0 : -1;
              case ERROR:
                  spes.error(pe2.getFailure());
                  // Errors are bad, close the parent
                  yss.close();
                  return -1;
              case CLOSE:
                  // No end of stream as there may
                  // be others connected
                  return -1;
          }
        });
        return 0;
    case ERROR:
         // Everyone goes bang
         spes.error(pe.getFailure());
         return -1;
    case CLOSE:
         // This doesn’t wait for the children to
         // complete, which is good as they don’t!
         spes.endOfStream();
         return -1;
  }
});

Promise<Long> p = mergedInfiniteStreams.forEach(i -> print(i));
Thread.sleep(SECONDS.toMillis(15));
 zs.close();

// Don’t miss the end of the stream when exiting
p.getFailure()

—————

This is a basic example which adds a single buffer. If you don’t want a buffer 
then you can create your own PushEventSource which pipes the events and keeps 
track of its close status (it would be pretty trivial). This would also allow 
you to have back pressure sent to the producer threads. Left as an exercise for 
the reader as I’m on vacation in Tenerife right now!

All the best, 

Tim.

>     }
>  
>     // Starts a thread that emits an event every pause seconds
>     public static PushEventSource<Integer> time(int pause) { … }
>  
>     // Starts a thread that emits the items on the list and closes the stream
>     public static <T> PushEventSource<T> list(T... list) {…}
>  
> Regards
> -----------------------------------
> Todor Boev
> OSGi Platform
> Software AG
>  
>  
> _______________________________________________
> OSGi Developer Mail List
> osgi-dev@mail.osgi.org
> https://mail.osgi.org/mailman/listinfo/osgi-dev
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to