Both solutions worked.
I tried to replace the nested callback code with PushStream.merge():
public static <T> PushStream<T> flatten(PushStream<PushStream<T>> nested) {
@SuppressWarnings("unchecked")
SimplePushEventSource<T> spes = (SimplePushEventSource<T>)
streams.createSimpleEventSource(Object.class);
PushStream<T> merged = streams.buildStream(spes).unbuffered().build();
// As nested streams are pulled out attach them to the merged stream
nested.forEach(merged::merge);
return merged;
}
It did not work. The initial list source is opened and none of the nested timer
sources get open.
Why is that?
-----------------------------------
Todor Boev
OSGi Platform
Software AG
From: Tim Ward [mailto:[email protected]]
Sent: Tuesday, March 19, 2019 8:44 PM
To: Boev, Todor; OSGi Developer Mail List
Subject: Re: [osgi-dev] Flatten a PushStream<PushStream<T>>
Hi Todor,
Sent from my iPhone
On 19 Mar 2019, at 15:45, Boev, Todor via osgi-dev
<[email protected]<mailto:[email protected]>> wrote:
Hello,
Is it possible to have a push stream of push streams?
PushStream<PushStream<T>>
Yes, this would be an asynchronous year arriving set of PushStream instances.
It’s pretty common when a mapping operation turns one event into multiple
events.
And if so how can I flatten it to a PushStream<T>?
The flatMap operation is your friend here, and possibly what you needed to use
when you ended up with a PushStream<PushStream<T>>. See
https://osgi.org/javadoc/osgi.cmpn/7.0.0/org/osgi/util/pushstream/PushStream.html#flatMap-org.osgi.util.function.Function-
Here you should not have used map as the first operation, but flatMap, then the
final operation would be unnecessary.
The example I am trying looks something like this:
private static PushStreamProvider streams = new PushStreamProvider();
public static void main(String[] args) throws InterruptedException {
// Emit 1,2,3 asynchronously and close
PushStream<Integer> xs = streams.createStream(list(1 ,2, 3));
// Emit un-connected streams each of which once connected will emit a
number every X seconds
PushStream<PushStream<Integer>> yss = xs.map(x ->
streams.createStream(time(x)).map(ignore -> x));
// Flatten all time events into one stream
PushStream<Integer> zs = yss.flatMap(ts -> ts);
// This should connect all 3 timer streams and print the output for 15
sec
// Instead only the first timer stream is connected
zs.forEach(i -> print(i));
Thread.sleep(SECONDS.toMillis(15));
zs.close();
The reason for this is that by default PushStreams consume events serially and
in order (which is mostly what people want). Therefore the output of the
flatMap is trying to consume the whole of the stream created by the first event
before moving on to the next. The important thing to realise is that the
flatMap expects the stream that you map to not to be infinite, or if it is
infinite that you limit it in some way.
To get the behaviour you want you either need to:
• up the parallelism of your stream (requiring you to know in advance how many
input events there will be). Ordering doesn’t matter to you so this would have
few other effects. It should not be necessary to add more threads.
• actually consume and republish the events from the generated streams rather
than flatmapping. For example:
—————
PushStream<Integer> xs = streams.createStream(list(1 ,2, 3));
// Emit un-connected streams each of which once connected will emit a number
every X seconds
PushStream<PushStream<Integer>> yss = xs.map(x ->
streams.createStream(time(x)).map(ignore -> x));
SimplePushEventSource<Integer> spes = streams.createSimplePushEventSource();
Pushstream<Integer> mergedInfiniteStreams =
streams.buildStream().unbuffered().build();
vss.forEachEvent(pe -> {
switch(pe.getType()) {
case DATA:
pe.getData().forEachEvent(pe2 -> {
switch(pe2.getType()) {
case DATA:
spes.publish(pe2.getData());
// shut down producers if nobody
// is listening anymore.
return spes.isConnected() ? 0 : -1;
case ERROR:
spes.error(pe2.getFailure());
// Errors are bad, close the parent
yss.close();
return -1;
case CLOSE:
// No end of stream as there may
// be others connected
return -1;
}
});
return 0;
case ERROR:
// Everyone goes bang
spes.error(pe.getFailure());
return -1;
case CLOSE:
// This doesn’t wait for the children to
// complete, which is good as they don’t!
spes.endOfStream();
return -1;
}
});
Promise<Long> p = mergedInfiniteStreams.forEach(i -> print(i));
Thread.sleep(SECONDS.toMillis(15));
zs.close();
// Don’t miss the end of the stream when exiting
p.getFailure()
—————
This is a basic example which adds a single buffer. If you don’t want a buffer
then you can create your own PushEventSource which pipes the events and keeps
track of its close status (it would be pretty trivial). This would also allow
you to have back pressure sent to the producer threads. Left as an exercise for
the reader as I’m on vacation in Tenerife right now!
All the best,
Tim.
}
// Starts a thread that emits an event every pause seconds
public static PushEventSource<Integer> time(int pause) { … }
// Starts a thread that emits the items on the list and closes the stream
public static <T> PushEventSource<T> list(T... list) {…}
Regards
-----------------------------------
Todor Boev
OSGi Platform
Software AG
_______________________________________________
OSGi Developer Mail List
[email protected]<mailto:[email protected]>
https://mail.osgi.org/mailman/listinfo/osgi-dev
_______________________________________________
OSGi Developer Mail List
[email protected]
https://mail.osgi.org/mailman/listinfo/osgi-dev