Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
The JavaDoc for the Source.from(...) method says this: Elements are emitted periodically with the specified interval. The tick element will be delivered to downstream consumers that has requested any elements. If a consumer has not requested any elements at the point in time when the tick

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Viktor Klang
There is no stream cancellation, there is only erronous or normal completion. What does the java/scaladoc say? -- Cheers, √ On 26 Jul 2015 09:17, David Pinn dp...@byandlarge.net wrote: No, but I might try to put one together. When the tick source is cancelled, should that cause cancellation

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
Yes, that's what I'm doing. More precisely, I'm defining a composite Source like so: Initial ~ Merge ~ Akka HTTP ~ Broadcast ~ Consumer +~ ~+ | | +~~~ Throttler ~~+ The

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Rafał Krzewski
David, does your flow contain an actual internal feedback loop? What I understand from your description is that you have something like this: { tick source ~ mapAsync calling external service ~ } ~ consumer And your return the Cancellable provided by the tick source as materialized value as

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
No, but I might try to put one together. When the tick source is cancelled, should that cause cancellation of the stream, or completion of the stream? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
I've created a test that exercises a composite source that has an internal cycle. It processes integers, starting at 1 and doubling it until it is cancelled. The code can be viewed as a gist https://gist.github.com/dpinn/cdac6709e00b7de64163. Two things to note: a) the stream completes

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Rafał Krzewski
Since we have √'s attention, allow me to repeat my original question: The question now remains, how do I fabricate a Graph[FlowShape[T, T], Cancellable] that will generate an instance of Cancellable on each materialization connected to a PushPullStage, in such way that cancel() would

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
I think this could happen if a stage immediately requests the next element once it starts working on a received element. Is that the case? On Sunday, 26 July 2015 23:39:00 UTC+10, David Pinn wrote: b) the stream processes one more element than I expect it to. The source under test spits out

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Viktor Klang
Requesting one element at a time would lead to low-to-no concurrency and high overhead per element. Check input buffer size in Attributes. -- Cheers, √ On 26 Jul 2015 17:50, David Pinn dp...@byandlarge.net wrote: I think this could happen if a stage immediately requests the next element once

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-25 Thread David Pinn
This is pretty much exactly what I'm trying to do. I'm polling an external system every 20 seconds. I use a tick source to control the timing, zipping the ticks with the equivalent of your WatchRequest. The tick source materializes to a Cancellable, so that's nice. Tragically, cancelling the

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-25 Thread Viktor Klang
Do you happen to have a minimized failing test case for that? On Sat, Jul 25, 2015 at 6:36 PM, David Pinn dp...@byandlarge.net wrote: This is pretty much exactly what I'm trying to do. I'm polling an external system every 20 seconds. I use a tick source to control the timing, zipping the

[akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-04-24 Thread Rafał Krzewski
I think I've moved one step closer: I think I know how to weld a flow breaker into my graph: def watch(key: String, waitIndex: Option[Int] = None, recursive: Option[Boolean] = None, quorum: Option[Boolean] = None): Source[EtcdResponse, Cancellable] = { case class WatchRequest(key:

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-04-24 Thread Rafał Krzewski
W dniu piątek, 24 kwietnia 2015 14:24:35 UTC+2 użytkownik drewhk napisał: The question now remains, how do I fabricate a Graph[FlowShape[T, T], Cancellable] that will generate an instance of Cancellable on each materialization connected to a PushPullStage, in such way that cancel() would

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-04-24 Thread Endre Varga
Hi Rafal On Fri, Apr 24, 2015 at 2:21 PM, Rafał Krzewski rafal.krzew...@gmail.com wrote: I think I've moved one step closer: I think I know how to weld a flow breaker into my graph: def watch(key: String, waitIndex: Option[Int] = None, recursive: Option[Boolean] = None, quorum: