Both solutions worked. I tried to replace the nested callback code with PushStream.merge():
public static <T> PushStream<T> flatten(PushStream<PushStream<T>> nested) { @SuppressWarnings("unchecked") SimplePushEventSource<T> spes = (SimplePushEventSource<T>) streams.createSimpleEventSource(Object.class); PushStream<T> merged = streams.buildStream(spes).unbuffered().build(); // As nested streams are pulled out attach them to the merged stream nested.forEach(merged::merge); return merged; } It did not work. The initial list source is opened and none of the nested timer sources get open. Why is that? ----------------------------------- Todor Boev OSGi Platform Software AG From: Tim Ward [mailto:tim.w...@paremus.com] Sent: Tuesday, March 19, 2019 8:44 PM To: Boev, Todor; OSGi Developer Mail List Subject: Re: [osgi-dev] Flatten a PushStream<PushStream<T>> Hi Todor, Sent from my iPhone On 19 Mar 2019, at 15:45, Boev, Todor via osgi-dev <osgi-dev@mail.osgi.org<mailto:osgi-dev@mail.osgi.org>> wrote: Hello, Is it possible to have a push stream of push streams? PushStream<PushStream<T>> Yes, this would be an asynchronous year arriving set of PushStream instances. It’s pretty common when a mapping operation turns one event into multiple events. And if so how can I flatten it to a PushStream<T>? The flatMap operation is your friend here, and possibly what you needed to use when you ended up with a PushStream<PushStream<T>>. See https://osgi.org/javadoc/osgi.cmpn/7.0.0/org/osgi/util/pushstream/PushStream.html#flatMap-org.osgi.util.function.Function- Here you should not have used map as the first operation, but flatMap, then the final operation would be unnecessary. The example I am trying looks something like this: private static PushStreamProvider streams = new PushStreamProvider(); public static void main(String[] args) throws InterruptedException { // Emit 1,2,3 asynchronously and close PushStream<Integer> xs = streams.createStream(list(1 ,2, 3)); // Emit un-connected streams each of which once connected will emit a number every X seconds PushStream<PushStream<Integer>> yss = xs.map(x -> streams.createStream(time(x)).map(ignore -> x)); // Flatten all time events into one stream PushStream<Integer> zs = yss.flatMap(ts -> ts); // This should connect all 3 timer streams and print the output for 15 sec // Instead only the first timer stream is connected zs.forEach(i -> print(i)); Thread.sleep(SECONDS.toMillis(15)); zs.close(); The reason for this is that by default PushStreams consume events serially and in order (which is mostly what people want). Therefore the output of the flatMap is trying to consume the whole of the stream created by the first event before moving on to the next. The important thing to realise is that the flatMap expects the stream that you map to not to be infinite, or if it is infinite that you limit it in some way. To get the behaviour you want you either need to: • up the parallelism of your stream (requiring you to know in advance how many input events there will be). Ordering doesn’t matter to you so this would have few other effects. It should not be necessary to add more threads. • actually consume and republish the events from the generated streams rather than flatmapping. For example: ————— PushStream<Integer> xs = streams.createStream(list(1 ,2, 3)); // Emit un-connected streams each of which once connected will emit a number every X seconds PushStream<PushStream<Integer>> yss = xs.map(x -> streams.createStream(time(x)).map(ignore -> x)); SimplePushEventSource<Integer> spes = streams.createSimplePushEventSource(); Pushstream<Integer> mergedInfiniteStreams = streams.buildStream().unbuffered().build(); vss.forEachEvent(pe -> { switch(pe.getType()) { case DATA: pe.getData().forEachEvent(pe2 -> { switch(pe2.getType()) { case DATA: spes.publish(pe2.getData()); // shut down producers if nobody // is listening anymore. return spes.isConnected() ? 0 : -1; case ERROR: spes.error(pe2.getFailure()); // Errors are bad, close the parent yss.close(); return -1; case CLOSE: // No end of stream as there may // be others connected return -1; } }); return 0; case ERROR: // Everyone goes bang spes.error(pe.getFailure()); return -1; case CLOSE: // This doesn’t wait for the children to // complete, which is good as they don’t! spes.endOfStream(); return -1; } }); Promise<Long> p = mergedInfiniteStreams.forEach(i -> print(i)); Thread.sleep(SECONDS.toMillis(15)); zs.close(); // Don’t miss the end of the stream when exiting p.getFailure() ————— This is a basic example which adds a single buffer. If you don’t want a buffer then you can create your own PushEventSource which pipes the events and keeps track of its close status (it would be pretty trivial). This would also allow you to have back pressure sent to the producer threads. Left as an exercise for the reader as I’m on vacation in Tenerife right now! All the best, Tim. } // Starts a thread that emits an event every pause seconds public static PushEventSource<Integer> time(int pause) { … } // Starts a thread that emits the items on the list and closes the stream public static <T> PushEventSource<T> list(T... list) {…} Regards ----------------------------------- Todor Boev OSGi Platform Software AG _______________________________________________ OSGi Developer Mail List osgi-dev@mail.osgi.org<mailto:osgi-dev@mail.osgi.org> https://mail.osgi.org/mailman/listinfo/osgi-dev
_______________________________________________ OSGi Developer Mail List osgi-dev@mail.osgi.org https://mail.osgi.org/mailman/listinfo/osgi-dev