Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
The JavaDoc for the Source.from(...) method says this: Elements are emitted periodically with the specified interval. The tick element will be delivered to downstream consumers that has requested any elements. If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later. It will receive new tick elements as soon as it has requested more elements. There's nothing there about stream completion. I suppose it should complete normally. The name Cancellable threw me a bit. Most of the JavaDoc for the Graph variants associate cancellation with failure; for example: 'Cancels when' downstream cancels. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
There is no stream cancellation, there is only erronous or normal completion. What does the java/scaladoc say? -- Cheers, √ On 26 Jul 2015 09:17, David Pinn dp...@byandlarge.net wrote: No, but I might try to put one together. When the tick source is cancelled, should that cause cancellation of the stream, or completion of the stream? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
Yes, that's what I'm doing. More precisely, I'm defining a composite Source like so: Initial ~ Merge ~ Akka HTTP ~ Broadcast ~ Consumer +~ ~+ | | +~~~ Throttler ~~+ The Throttler flow contains a tick source, the Cancellable materialized value of which is externalized as the materialized value of the composite Source. The Akka HTTP stage sends a GET request to a remote system. I have a small 'Runner' program that exercises the composite flow. It calls cancel after 40 seconds of execution. As you can see from the logging output, the loop closes in a less-than-happy fashion. [info] 07/26 21:35:28 INFO appspokes.onedrive.experimental.Runner - Cancelling stream [info] akka.stream.StreamTcpException: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 ERROR experimental.Runner - FAILURE! [info] akka.stream.StreamTcpException: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 DEBUG akka.stream.impl.fusing.ActorInterpreter - fail due to: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 INFO experimental.Runner - Shutting down [info] 07/26 21:35:58 DEBUG akka.stream.impl.fusing.ActorInterpreter - fail due to: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 DEBUG akka.stream.impl.Broadcast - fail due to: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 INFO experimental.Runner - Actor system has been shut down [info] 07/26 21:35:58 ERROR akka.stream.Materializer - [Sync Response] Upstream failed. [info] akka.stream.StreamTcpException: The connection closed with error Connection reset by peer [success] Total time: 74 s, completed 26/07/2015 9:35:59 PM Now, I will confess that, much earlier in the program's execution, I'm seeing this: [info] 07/26 21:35:03 DEBUG akka.stream.impl.PrefixAndTailImpl - Cancelling akka.stream.impl.MultiStreamOutputProcessor$SubstreamOutput@3216206b (after: 5000 ms) ...which I'm assuming is unrelated because it happens much earlier, and none of the error output occurs until the cancellation call is made. I'd be happy to post the code here, but it's rather verbose. The essential part is probably this (yeah, Java. sorry): builder.from(url).via(merge).via(polling(accessToken)).to(bcast.in()); builder.from(bcast.out(1)).via(throttler).via(nextLinks).to(merge); // loop-back return bcast.out(0); // outlet to the world I'll work on a minimized failing test case that does not involve HTTP requests, to see if the stream still completes with failure. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
David, does your flow contain an actual internal feedback loop? What I understand from your description is that you have something like this: { tick source ~ mapAsync calling external service ~ } ~ consumer And your return the Cancellable provided by the tick source as materialized value as your flow's materialized value. I think in this case, calling cancel() on that returned value should complete the whole flow without problems. If this is not the case, it would be best if you showed some code :) By the way, is there an issue for producing custom materialized values from user-defined graphs? I browsed through the 'streams-*' milestones and I was unable to find it. Should I file one? cheers, Rafał W dniu niedziela, 26 lipca 2015 10:24:06 UTC+2 użytkownik David Pinn napisał: The JavaDoc for the Source.from(...) method says this: Elements are emitted periodically with the specified interval. The tick element will be delivered to downstream consumers that has requested any elements. If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later. It will receive new tick elements as soon as it has requested more elements. There's nothing there about stream completion. I suppose it should complete normally. The name Cancellable threw me a bit. Most of the JavaDoc for the Graph variants associate cancellation with failure; for example: 'Cancels when' downstream cancels. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
No, but I might try to put one together. When the tick source is cancelled, should that cause cancellation of the stream, or completion of the stream? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
I've created a test that exercises a composite source that has an internal cycle. It processes integers, starting at 1 and doubling it until it is cancelled. The code can be viewed as a gist https://gist.github.com/dpinn/cdac6709e00b7de64163. Two things to note: a) the stream completes successfully. To that extent, I've failed to reproduce the strangeness that I mentioned earlier; and b) the stream processes one more element than I expect it to. Doubling 1 Cancelling tick source = 2 Doubling 2 = 4 [error] Test cancellation.CancellationTest.testCancellation failed: assertion failed: expected OnComplete, found OnNext(4), took 4.11 sec [error] Failed: Total 1, Failed 1, Errors 0, Passed 0 [error] Failed tests: [error] cancellation.CancellationTest [error] (onedrive/test:test) sbt.TestsFailedException: Tests unsuccessful [error] Total time: 6 s, completed 26/07/2015 11:30:15 PM The source under test spits out the integer 4, even though the ticks were cancelled while the integer 1 was being doubled. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
Since we have √'s attention, allow me to repeat my original question: The question now remains, how do I fabricate a Graph[FlowShape[T, T], Cancellable] that will generate an instance of Cancellable on each materialization connected to a PushPullStage, in such way that cancel() would blow the fuse and terminate the stream? This is currently not possible, as we had not yet exposed the necessary API to users yet -- but the funcionality is there internally, we just want to gather a bit of experience before opening it up. -Endre What about this use case? Is there a ticket tracking this already? Cheers, Rafał -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
I think this could happen if a stage immediately requests the next element once it starts working on a received element. Is that the case? On Sunday, 26 July 2015 23:39:00 UTC+10, David Pinn wrote: b) the stream processes one more element than I expect it to. The source under test spits out the integer 4, even though the ticks were cancelled while the integer 1 was being doubled. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
Requesting one element at a time would lead to low-to-no concurrency and high overhead per element. Check input buffer size in Attributes. -- Cheers, √ On 26 Jul 2015 17:50, David Pinn dp...@byandlarge.net wrote: I think this could happen if a stage immediately requests the next element once it starts working on a received element. Is that the case? On Sunday, 26 July 2015 23:39:00 UTC+10, David Pinn wrote: b) the stream processes one more element than I expect it to. The source under test spits out the integer 4, even though the ticks were cancelled while the integer 1 was being doubled. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
This is pretty much exactly what I'm trying to do. I'm polling an external system every 20 seconds. I use a tick source to control the timing, zipping the ticks with the equivalent of your WatchRequest. The tick source materializes to a Cancellable, so that's nice. Tragically, cancelling the Cancellable does not appear to cause completion of the breaker flow. I'd love to know how you eventually resolved this issue. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
Do you happen to have a minimized failing test case for that? On Sat, Jul 25, 2015 at 6:36 PM, David Pinn dp...@byandlarge.net wrote: This is pretty much exactly what I'm trying to do. I'm polling an external system every 20 seconds. I use a tick source to control the timing, zipping the ticks with the equivalent of your WatchRequest. The tick source materializes to a Cancellable, so that's nice. Tragically, cancelling the Cancellable does not appear to cause completion of the breaker flow. I'd love to know how you eventually resolved this issue. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: streams - shutting down a flow / exposing a materialized value
I think I've moved one step closer: I think I know how to weld a flow breaker into my graph: def watch(key: String, waitIndex: Option[Int] = None, recursive: Option[Boolean] = None, quorum: Option[Boolean] = None): Source[EtcdResponse, Cancellable] = { case class WatchRequest(key: String, waitIndex: Option[Int], recursive: Option[Boolean], quorum: Option[Boolean]) val init = WatchRequest(key, waitIndex, recursive, quorum) val breaker: Graph[FlowShape[WatchRequest, WatchRequest], Cancellable] = ??? Source[EtcdResponse, Cancellable](breaker) { implicit b = import FlowGraph.Implicits._ val initReq = b.add(Source.single(init)) val reqMerge = b.add(Merge[WatchRequest](2)) val runWait = b.add(Flow[WatchRequest].mapAsync { req = this.wait(req.key, req.waitIndex, req.recursive, req.quorum).map { resp = (req.copy(waitIndex = Some(resp.node.modifiedIndex + 1)), resp) } }) val respUnzip = b.add(Unzip[WatchRequest, EtcdResponse]()) initReq ~ reqMerge.in(0) reqMerge ~ runWait runWait ~ respUnzip.in breaker = { respUnzip.out0 ~ breaker.inlet breaker.outlet ~ reqMerge.in(1) respUnzip.out1 } } } The question now remains, how do I fabricate a Graph[FlowShape[T, T], Cancellable] that will generate an instance of Cancellable on each materialization connected to a PushPullStage, in such way that cancel() would blow the fuse and terminate the stream? Cheers, Rafał W dniu piątek, 24 kwietnia 2015 11:53:31 UTC+2 użytkownik Rafał Krzewski napisał: Hi, I've decided to dive into the streams ;) and implemented a client for etcd[1] using akka http client. It worked really well, and was a lot of fun! However, I'm missing one final piece that I wasn't able to figure out: The client offers a watch function that returns a stream of EtcdResponses [2] Right now the actual returned type is Source[EtcdResponse, Unit], but actually I'd like the materialized value to be akka.actor.Cancellable that would allow the client to shut down the updates stream and release it's resources. I think a custom PushPullStage exposing Cancellable interface, inserted into the flow's feedback loop could do the job of shutting down the stream, but I couldn't find a way to expose the materialized value from the FlowGraph construction block. I was looking at TCP streams and TickSource that do return interesting materialized values, but they use low level private [stream] APIs so I couldn't adapt any of that to my high-level client code. Hints will be appreciated :) Cheers, Rafał [1] https://github.com/coreos/etcd [2] https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/etcd/EtcdClient.scala#L57 -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
W dniu piątek, 24 kwietnia 2015 14:24:35 UTC+2 użytkownik drewhk napisał: The question now remains, how do I fabricate a Graph[FlowShape[T, T], Cancellable] that will generate an instance of Cancellable on each materialization connected to a PushPullStage, in such way that cancel() would blow the fuse and terminate the stream? This is currently not possible, as we had not yet exposed the necessary API to users yet -- but the funcionality is there internally, we just want to gather a bit of experience before opening it up. A-ha! I'm glad I made it all the way to Road closed, construction ahead sign then :) Cheers, Rafał -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
Hi Rafal On Fri, Apr 24, 2015 at 2:21 PM, Rafał Krzewski rafal.krzew...@gmail.com wrote: I think I've moved one step closer: I think I know how to weld a flow breaker into my graph: def watch(key: String, waitIndex: Option[Int] = None, recursive: Option[Boolean] = None, quorum: Option[Boolean] = None): Source[EtcdResponse, Cancellable] = { case class WatchRequest(key: String, waitIndex: Option[Int], recursive: Option[Boolean], quorum: Option[Boolean]) val init = WatchRequest(key, waitIndex, recursive, quorum) val breaker: Graph[FlowShape[WatchRequest, WatchRequest], Cancellable] = ??? Source[EtcdResponse, Cancellable](breaker) { implicit b = import FlowGraph.Implicits._ val initReq = b.add(Source.single(init)) val reqMerge = b.add(Merge[WatchRequest](2)) val runWait = b.add(Flow[WatchRequest].mapAsync { req = this.wait(req.key, req.waitIndex, req.recursive, req.quorum).map { resp = (req.copy(waitIndex = Some(resp.node.modifiedIndex + 1)), resp) } }) val respUnzip = b.add(Unzip[WatchRequest, EtcdResponse]()) initReq ~ reqMerge.in(0) reqMerge ~ runWait runWait ~ respUnzip.in breaker = { respUnzip.out0 ~ breaker.inlet breaker.outlet ~ reqMerge.in(1) respUnzip.out1 } } } The question now remains, how do I fabricate a Graph[FlowShape[T, T], Cancellable] that will generate an instance of Cancellable on each materialization connected to a PushPullStage, in such way that cancel() would blow the fuse and terminate the stream? This is currently not possible, as we had not yet exposed the necessary API to users yet -- but the funcionality is there internally, we just want to gather a bit of experience before opening it up. -Endre Cheers, Rafał W dniu piątek, 24 kwietnia 2015 11:53:31 UTC+2 użytkownik Rafał Krzewski napisał: Hi, I've decided to dive into the streams ;) and implemented a client for etcd[1] using akka http client. It worked really well, and was a lot of fun! However, I'm missing one final piece that I wasn't able to figure out: The client offers a watch function that returns a stream of EtcdResponses [2] Right now the actual returned type is Source[EtcdResponse, Unit], but actually I'd like the materialized value to be akka.actor.Cancellable that would allow the client to shut down the updates stream and release it's resources. I think a custom PushPullStage exposing Cancellable interface, inserted into the flow's feedback loop could do the job of shutting down the stream, but I couldn't find a way to expose the materialized value from the FlowGraph construction block. I was looking at TCP streams and TickSource that do return interesting materialized values, but they use low level private [stream] APIs so I couldn't adapt any of that to my high-level client code. Hints will be appreciated :) Cheers, Rafał [1] https://github.com/coreos/etcd [2] https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/etcd/EtcdClient.scala#L57 -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.