Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
The JavaDoc for the Source.from(...) method says this: Elements are emitted periodically with the specified interval. The tick element will be delivered to downstream consumers that has requested any elements. If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later. It will receive new tick elements as soon as it has requested more elements. There's nothing there about stream completion. I suppose it should complete normally. The name Cancellable threw me a bit. Most of the JavaDoc for the Graph variants associate cancellation with failure; for example: 'Cancels when' downstream cancels. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
There is no stream cancellation, there is only erronous or normal completion. What does the java/scaladoc say? -- Cheers, √ On 26 Jul 2015 09:17, David Pinn dp...@byandlarge.net wrote: No, but I might try to put one together. When the tick source is cancelled, should that cause cancellation of the stream, or completion of the stream? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
Yes, that's what I'm doing. More precisely, I'm defining a composite Source like so: Initial ~ Merge ~ Akka HTTP ~ Broadcast ~ Consumer +~ ~+ | | +~~~ Throttler ~~+ The Throttler flow contains a tick source, the Cancellable materialized value of which is externalized as the materialized value of the composite Source. The Akka HTTP stage sends a GET request to a remote system. I have a small 'Runner' program that exercises the composite flow. It calls cancel after 40 seconds of execution. As you can see from the logging output, the loop closes in a less-than-happy fashion. [info] 07/26 21:35:28 INFO appspokes.onedrive.experimental.Runner - Cancelling stream [info] akka.stream.StreamTcpException: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 ERROR experimental.Runner - FAILURE! [info] akka.stream.StreamTcpException: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 DEBUG akka.stream.impl.fusing.ActorInterpreter - fail due to: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 INFO experimental.Runner - Shutting down [info] 07/26 21:35:58 DEBUG akka.stream.impl.fusing.ActorInterpreter - fail due to: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 DEBUG akka.stream.impl.Broadcast - fail due to: The connection closed with error Connection reset by peer [info] 07/26 21:35:58 INFO experimental.Runner - Actor system has been shut down [info] 07/26 21:35:58 ERROR akka.stream.Materializer - [Sync Response] Upstream failed. [info] akka.stream.StreamTcpException: The connection closed with error Connection reset by peer [success] Total time: 74 s, completed 26/07/2015 9:35:59 PM Now, I will confess that, much earlier in the program's execution, I'm seeing this: [info] 07/26 21:35:03 DEBUG akka.stream.impl.PrefixAndTailImpl - Cancelling akka.stream.impl.MultiStreamOutputProcessor$SubstreamOutput@3216206b (after: 5000 ms) ...which I'm assuming is unrelated because it happens much earlier, and none of the error output occurs until the cancellation call is made. I'd be happy to post the code here, but it's rather verbose. The essential part is probably this (yeah, Java. sorry): builder.from(url).via(merge).via(polling(accessToken)).to(bcast.in()); builder.from(bcast.out(1)).via(throttler).via(nextLinks).to(merge); // loop-back return bcast.out(0); // outlet to the world I'll work on a minimized failing test case that does not involve HTTP requests, to see if the stream still completes with failure. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
David, does your flow contain an actual internal feedback loop? What I understand from your description is that you have something like this: { tick source ~ mapAsync calling external service ~ } ~ consumer And your return the Cancellable provided by the tick source as materialized value as your flow's materialized value. I think in this case, calling cancel() on that returned value should complete the whole flow without problems. If this is not the case, it would be best if you showed some code :) By the way, is there an issue for producing custom materialized values from user-defined graphs? I browsed through the 'streams-*' milestones and I was unable to find it. Should I file one? cheers, Rafał W dniu niedziela, 26 lipca 2015 10:24:06 UTC+2 użytkownik David Pinn napisał: The JavaDoc for the Source.from(...) method says this: Elements are emitted periodically with the specified interval. The tick element will be delivered to downstream consumers that has requested any elements. If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later. It will receive new tick elements as soon as it has requested more elements. There's nothing there about stream completion. I suppose it should complete normally. The name Cancellable threw me a bit. Most of the JavaDoc for the Graph variants associate cancellation with failure; for example: 'Cancels when' downstream cancels. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
No, but I might try to put one together. When the tick source is cancelled, should that cause cancellation of the stream, or completion of the stream? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
I've created a test that exercises a composite source that has an internal cycle. It processes integers, starting at 1 and doubling it until it is cancelled. The code can be viewed as a gist https://gist.github.com/dpinn/cdac6709e00b7de64163. Two things to note: a) the stream completes successfully. To that extent, I've failed to reproduce the strangeness that I mentioned earlier; and b) the stream processes one more element than I expect it to. Doubling 1 Cancelling tick source = 2 Doubling 2 = 4 [error] Test cancellation.CancellationTest.testCancellation failed: assertion failed: expected OnComplete, found OnNext(4), took 4.11 sec [error] Failed: Total 1, Failed 1, Errors 0, Passed 0 [error] Failed tests: [error] cancellation.CancellationTest [error] (onedrive/test:test) sbt.TestsFailedException: Tests unsuccessful [error] Total time: 6 s, completed 26/07/2015 11:30:15 PM The source under test spits out the integer 4, even though the ticks were cancelled while the integer 1 was being doubled. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
Since we have √'s attention, allow me to repeat my original question: The question now remains, how do I fabricate a Graph[FlowShape[T, T], Cancellable] that will generate an instance of Cancellable on each materialization connected to a PushPullStage, in such way that cancel() would blow the fuse and terminate the stream? This is currently not possible, as we had not yet exposed the necessary API to users yet -- but the funcionality is there internally, we just want to gather a bit of experience before opening it up. -Endre What about this use case? Is there a ticket tracking this already? Cheers, Rafał -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Best practice for nested flows and graphs
I would really appreciate any tips for the correct direction On Thursday, July 23, 2015 at 3:02:54 PM UTC+3, ra...@qmerce.com wrote: I am implementing a simple http server which listens to requests and persists them to Cassandra. I have to split the stream in two using Broadcast so I am using Graphs. I have something like this: val bindingFuture: Future[Http.ServerBinding] = source.serverSource.to(Sink.foreach { connection = // foreach materializes the source system.log.debug(Accepted new connection from + connection.remoteAddress) // ... and then actually handle the connection connection.handleWith(source.handleRequests)(server_materializer) }).run()(server_materializer) And also val handleRequests = Flow[HttpRequest].mapAsyncUnordered(core.num_of_cores)(v = Future(requestHandler(v))) val requestHandler: HttpRequest = HttpResponse = { case HttpRequest(POST, Uri.Path(/stats), headers, entity, protocol) = val s = entity.withContentType(ContentTypes.`application/json`).getDataBytes() val g = FlowGraph.closed() { implicit builder = import FlowGraph.Implicits._ //Bla bla bla... s ~ gflow ~ bcast ~ breakEvent ~ persistToC ~ out bcast ~ out } g.run() HttpResponse(200,headers = corsHeaders, entity = Thanks!) case _: HttpRequest = HttpResponse(404, entity = Unknown resource!) } I am calling `.run` twice and from what I understand a new stream will be initiated for each request. My plan is to setup an actor that will publish the entities downstream, However I am not sure this is the correct approach. Thanks in advance, Raam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] How do I mix typed and untyped actors?
How do I mix typed and untyped actors ? As I understood I have to specify main actor when I create instance of ActorSystem like this val system: akka.typed.ActorSystem[Start] = akka.typed.ActorSystem(main, Props(mainBehaviour)) On the other hand I use akka-http which is initialized like this implicit val system = ActorSystem()implicit val executor = system.dispatcherimplicit val materializer = ActorMaterializer()// etc... I see that I can create typed system from untyped system by calling object ActorSystem { def apply(untyped: akka.actor.ActorSystem): ActorSystem[Nothing] = new Wrapper(untyped.asInstanceOf[ExtendedActorSystem])} So assuming I did val typeSystem = akka.typed.ActorSystem(untypedSystem) how do I create my first typed actor from typeSystem ? There is no typed ActorContext whose actorOf I can call. What general recommendations would you give for using Akka Typed with Akk-Http ? Thanks! http://stackoverflow.com/questions/31621607/how-do-i-mix-typed-and-untyped-actors -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?
Hi, I'm still trying to figure out the best way to work with TCP flows and, while I've got something working, this seems really quite wrong, so there's gotta be a better way. What I want to do is send an Iterable[Int] from the client to the server and have the server materialize that resulting flow in a Future[Iterable[Int]]. val bytesStage = // elided... BidiFlow of serialization and framing val serverValuePromise = Promise[Seq[AnyRef]]() // Technically, the materialized value isn't important, since it's actually going to be pulled out // via the Promise val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow. wrap( // Consume the client's stream and complete the serverValuePromise with its folded result Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v). mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }), // We're not sending anything from this side Source.empty)(Keep.left) // The server val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle( serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true) // We really want to stop listening once the client has successfully connected, but this is good // enough serverValuePromise.future.onComplete { case _ = serverSide.onSuccess { case binding = binding.unbind() } } // I need the endpoint where the client needs to connect val destination = Await.result(serverSide, 1.second).localAddress // Get the source running Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp(). outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run() // Print out what the client has sent to the server Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt: $t)) I tried doing this the other way around - where the server side supplies source - but this caused me issues with actually shutting down the socket. Having the client do it seems to make shutting down the socket on completion of the source, just naturally occur. The problem with the server side providing the source was that the client source needed to finish properly. If I created it as `empty` then it would kill things too quickly. If I then created it as a n Actor source that just didn't do anything, I couldn't find a decent way to close it. There's gotta be a better way to do this, but I'm too much of a noob to see it. Can anyone improve this code for me? Thanks, Derek -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [Remoting] Akka Actor Refs Can be Called from another Node?
Greetings, I am working on some tech to integrate hazelcast and Akka. The thing is with Hazelcast, a particular object in the cache may be on any one of the nodes in the cluster and serializing the object from its host node to the node in the cluster where it is being processed can be expensive. So I was debating an alternative paradigm. Basically I was considering storing the actor refs to the Actor in the hazelcast map keyed by the member id. The idea is I ask the map Give me the actor ref for node 3 and it returns back an ActorRef object and then the router uses that to forward the message to the correct router on the correct node via a normal call. For example: def forward(msg: Object, key: Object) = { val uuid = hazelcast.getPartitionService().getPartition(key).getMember().getUuid() hazelcast.getMap(RoutingActors).get(uuid) ! msg } Can anyone see any issue with such a paradigm? Do I have to tell hazelcast that the actor ref is on another node? Do I need to open any ports or do any other configuration to get such a thing to work? Thanks. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Play + Akka Cluster Aware Routing?
So is the strategy I laid out flawed? I don't want to be accessing objects on the wrong node, that's a critical element of the dynamic. getting Akka and play to cooperate in cluster is not that hard. I want to make intelligent routing decisions. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
I think this could happen if a stage immediately requests the next element once it starts working on a received element. Is that the case? On Sunday, 26 July 2015 23:39:00 UTC+10, David Pinn wrote: b) the stream processes one more element than I expect it to. The source under test spits out the integer 4, even though the ticks were cancelled while the integer 1 was being doubled. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value
Requesting one element at a time would lead to low-to-no concurrency and high overhead per element. Check input buffer size in Attributes. -- Cheers, √ On 26 Jul 2015 17:50, David Pinn dp...@byandlarge.net wrote: I think this could happen if a stage immediately requests the next element once it starts working on a received element. Is that the case? On Sunday, 26 July 2015 23:39:00 UTC+10, David Pinn wrote: b) the stream processes one more element than I expect it to. The source under test spits out the integer 4, even though the ticks were cancelled while the integer 1 was being doubled. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.