Both solutions worked.

I tried to replace the nested callback code with PushStream.merge():

   public static <T> PushStream<T> flatten(PushStream<PushStream<T>> nested) {
        @SuppressWarnings("unchecked")
        SimplePushEventSource<T> spes = (SimplePushEventSource<T>) 
streams.createSimpleEventSource(Object.class);

        PushStream<T> merged = streams.buildStream(spes).unbuffered().build();
        // As nested streams are pulled out attach them to the merged stream
        nested.forEach(merged::merge);

        return merged;
   }

It did not work. The initial list source is opened and none of the nested timer 
sources get open.
Why is that?
-----------------------------------
Todor Boev
OSGi Platform
Software AG

From: Tim Ward [mailto:tim.w...@paremus.com]
Sent: Tuesday, March 19, 2019 8:44 PM
To: Boev, Todor; OSGi Developer Mail List
Subject: Re: [osgi-dev] Flatten a PushStream<PushStream<T>>

Hi Todor,
Sent from my iPhone

On 19 Mar 2019, at 15:45, Boev, Todor via osgi-dev 
<osgi-dev@mail.osgi.org<mailto:osgi-dev@mail.osgi.org>> wrote:

Hello,



Is it possible to have a push stream of push streams?

PushStream<PushStream<T>>

Yes, this would be an asynchronous year arriving set of PushStream instances. 
It’s pretty common when a mapping operation turns one event into multiple 
events.





And if so how can I flatten it to a PushStream<T>?

The flatMap operation is your friend here, and possibly what you needed to use 
when you ended up with a PushStream<PushStream<T>>. See 
https://osgi.org/javadoc/osgi.cmpn/7.0.0/org/osgi/util/pushstream/PushStream.html#flatMap-org.osgi.util.function.Function-

Here you should not have used map as the first operation, but flatMap, then the 
final operation  would be unnecessary.







The example I am trying looks something like this:



    private static PushStreamProvider streams = new PushStreamProvider();



    public static void main(String[] args) throws InterruptedException {

        // Emit 1,2,3 asynchronously and close

        PushStream<Integer> xs = streams.createStream(list(1 ,2, 3));

        // Emit un-connected streams each of which once connected will emit a 
number every X seconds

        PushStream<PushStream<Integer>> yss = xs.map(x -> 
streams.createStream(time(x)).map(ignore -> x));

        // Flatten all time events into one stream

        PushStream<Integer> zs = yss.flatMap(ts -> ts);



        // This should connect all 3 timer streams and print the output for 15 
sec

        // Instead only the first timer stream is connected

        zs.forEach(i -> print(i));

        Thread.sleep(SECONDS.toMillis(15));

        zs.close();

The reason for this is that by default PushStreams consume events serially and 
in order (which is mostly what people want). Therefore the output of the 
flatMap is trying to consume the whole of the stream created by the first event 
before moving on to the next. The important thing to realise is that the 
flatMap expects the stream that you map to not to be infinite, or if it is 
infinite that you limit it in some way.

To get the behaviour you want you either need to:

• up the parallelism of your stream (requiring you to know in advance how many 
input events there will be). Ordering doesn’t matter to you so this would have 
few other effects. It should not be necessary to add more threads.


• actually consume and republish the events from the generated streams rather 
than flatmapping. For example:

—————


PushStream<Integer> xs = streams.createStream(list(1 ,2, 3));

 // Emit un-connected streams each of which once connected will emit a number 
every X seconds

        PushStream<PushStream<Integer>> yss = xs.map(x -> 
streams.createStream(time(x)).map(ignore -> x));

SimplePushEventSource<Integer> spes = streams.createSimplePushEventSource();

Pushstream<Integer> mergedInfiniteStreams = 
streams.buildStream().unbuffered().build();

vss.forEachEvent(pe -> {

switch(pe.getType()) {
    case DATA:
        pe.getData().forEachEvent(pe2 -> {
          switch(pe2.getType()) {
              case DATA:
                  spes.publish(pe2.getData());
                  // shut down producers if nobody
                  // is listening anymore.
                  return spes.isConnected() ? 0 : -1;
              case ERROR:
                  spes.error(pe2.getFailure());
                  // Errors are bad, close the parent
                  yss.close();
                  return -1;
              case CLOSE:
                  // No end of stream as there may
                  // be others connected
                  return -1;
          }
        });
        return 0;
    case ERROR:
         // Everyone goes bang
         spes.error(pe.getFailure());
         return -1;
    case CLOSE:
         // This doesn’t wait for the children to
         // complete, which is good as they don’t!
         spes.endOfStream();
         return -1;
  }
});


Promise<Long> p = mergedInfiniteStreams.forEach(i -> print(i));

Thread.sleep(SECONDS.toMillis(15));

 zs.close();



// Don’t miss the end of the stream when exiting

p.getFailure()

—————

This is a basic example which adds a single buffer. If you don’t want a buffer 
then you can create your own PushEventSource which pipes the events and keeps 
track of its close status (it would be pretty trivial). This would also allow 
you to have back pressure sent to the producer threads. Left as an exercise for 
the reader as I’m on vacation in Tenerife right now!

All the best,

Tim.



    }



    // Starts a thread that emits an event every pause seconds

    public static PushEventSource<Integer> time(int pause) { … }



    // Starts a thread that emits the items on the list and closes the stream

    public static <T> PushEventSource<T> list(T... list) {…}



Regards

-----------------------------------

Todor Boev

OSGi Platform

Software AG




_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org<mailto:osgi-dev@mail.osgi.org>
https://mail.osgi.org/mailman/listinfo/osgi-dev
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to