Re: [akka-user] [akka-stream 0.9] Create Flow[A,B] from Publisher[A] and Subscriber[B]
Hi, I wanted as well to use ActorPublisher/ActorSubcriber pair. My use case was related to dynamically scaled number of wokers on a transforming task with a possible worker failure, restart and redo (based on Work Pulling pattern by Derek Wyatt on http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2 ) I wanted to use streams for all other processing but this particular stage needs to be dynamically scaled so I cannot use cookbook pattern Balancing jobs to a fixed pool of workers and I am not yet sure if i can use actor ask pattern from Globally limiting the rate of a set of streams example to achieve desired result. Independently of my particular problem I think the question could be restated what is the best practice in interfacing a flow transformer in akka streams with other parts of system which use actors. To be more exact is it possible for a Flow transformer (Sink and Source works) to process actor messages from external world which goes beyond actor ask pattern from Limiter example and be able to receive and process external actor messages in a thread safe way. I think it implies that the underlying actor must be the same instance for stream management and custom processing as it is for an ActorSubcriber or ActorPublisher. I imagine the api would extend the existing PropsSource and PropsSink and add a PropsFlow trait ActorFlow[In, Out] extends ActorPublisher[Out] with ActorSubscriber { .. } class MyActorFlow extends ActorFlow[In, Out] { ... } val props = Props(classOf[MyActorFlow]) val flowActor = PropsFlow[In, Out](props) val source = ... val sink = val flowGraph = FlowGraph { implicit builder = source ~ flowActor ~ sink } I think all pieces are already available with ActorPublisher and ActorSubscriber infrastructure and the missing part is some API code to support ActorFlow in my example. Best regards, Piotr Buszka On Friday, 26 December 2014 02:51:19 UTC+1, Konrad Malawski wrote: Hi Mark, would you mind explaining what exact use case you’re trying to cover? Instead of dropping down to reactive streams interfaces (publisher / subscriber) which are *very low-level*, instead you could perhaps stick with our much simpler to use custom Stages. Here’s the docs on implementing custom stages: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-customize.html And there are also plenty examples in the stream cookbook: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html which could be useful when implementing your custom processing stages. Hope this helps, happy hakking! -- Konrad 'ktoso' Malawski hAkker @ typesafe http://akka.io On 25 December 2014 at 14:13:32, Mark van Buskirk (markvan...@gmail.com javascript:) wrote: Id love to see an example of two actors that are both publisher and subscriber connected. I've been trying to get that going and have been having a bunch of trouble getting it working. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Recording latency of operations that return a Future inside an akka stream
Hi Soumya, I don’t think what you’ll end up measuring this way will be very useful. I mean, between the completion of the future and the triggering of the map there are multiple asynchronous boundaries… So you won’t be measuring how fast the set operation was, but how much time was between these asynchronous boundaries - which could have been backpressured by the way. I suggest directly wrapping the set call with your measurement logic instead - since that is what you want to measure it seems. By the way, we do have a “timed” element, in our extras section: https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/extra/Timed.scala You can `import Timed._` and then use it as shown here: https://github.com/akka/akka/blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala It’s a rather old element and I’m not sure if we’ll be keeping it, but you can use it as a source of inspiration in case you end up needing that kind of measurement. On 26 December 2014 at 05:46:55, Soumya Simanta (soumya.sima...@gmail.com) wrote: This is related to this thread but sufficiently different that I decided to create new thread. Hope that's okay. I would like to create a histogram of latency of a large number of set operations ( set returns a Future[Boolean]) using LatencyUtils Basically I need to start recording the time before the set operation (inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then somehow record the end time in a map operation( .map( //record the end time here) after this. I'm having a hard time trying to figure this out. My understanding is that the even though the mapAsyncUnordered doesn't maintain the order of operations the map following the mapAsynUnordered will maintain the order from the previous stage because of TCP maintaining the order. Is this correct? val redis = RedisClient(localhost) val random1 = UUID.randomUUID().toString def insertValues(rnd: String): Flow[Int, Boolean] = { Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map( //record the end time here) } val blackhole = BlackholeSink val maxSeq = 500 val seqSource = Source( () = (1 to maxSeq).iterator ) val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-stream 0.9] Create Flow[A,B] from Publisher[A] and Subscriber[B]
I think I'm clinging to trying to use actors as both publisher and subscriber because I like actor supervision and I don't see how you do supervision with streams. Does a custom Stage / transformer have supervision where exceptions/errors can cause a restart/resume/escalate that affects a particular steam element but let's the following elements continue on? What am I missing? I feel like there is something big here that I'm just not seeing. I want backpressure, supervision and the ability to make cohesive little modules of code, I.e. micro services. Then I want to pass commands and domain objects between these microservices. I want back pressure, and supervision at all levels. Am I misguided in this design? Am I missing something obvious? On Sat, Dec 27, 2014, 05:12 Piotr Buszka pi...@buszka.eu wrote: Hi, I wanted as well to use ActorPublisher/ActorSubcriber pair. My use case was related to dynamically scaled number of wokers on a transforming task with a possible worker failure, restart and redo (based on Work Pulling pattern by Derek Wyatt on http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2 ) I wanted to use streams for all other processing but this particular stage needs to be dynamically scaled so I cannot use cookbook pattern Balancing jobs to a fixed pool of workers and I am not yet sure if i can use actor ask pattern from Globally limiting the rate of a set of streams example to achieve desired result. Independently of my particular problem I think the question could be restated what is the best practice in interfacing a flow transformer in akka streams with other parts of system which use actors. To be more exact is it possible for a Flow transformer (Sink and Source works) to process actor messages from external world which goes beyond actor ask pattern from Limiter example and be able to receive and process external actor messages in a thread safe way. I think it implies that the underlying actor must be the same instance for stream management and custom processing as it is for an ActorSubcriber or ActorPublisher. I imagine the api would extend the existing PropsSource and PropsSink and add a PropsFlow trait ActorFlow[In, Out] extends ActorPublisher[Out] with ActorSubscriber { .. } class MyActorFlow extends ActorFlow[In, Out] { ... } val props = Props(classOf[MyActorFlow]) val flowActor = PropsFlow[In, Out](props) val source = ... val sink = val flowGraph = FlowGraph { implicit builder = source ~ flowActor ~ sink } I think all pieces are already available with ActorPublisher and ActorSubscriber infrastructure and the missing part is some API code to support ActorFlow in my example. Best regards, Piotr Buszka On Friday, 26 December 2014 02:51:19 UTC+1, Konrad Malawski wrote: Hi Mark, would you mind explaining what exact use case you’re trying to cover? Instead of dropping down to reactive streams interfaces (publisher / subscriber) which are *very low-level*, instead you could perhaps stick with our much simpler to use custom Stages. Here’s the docs on implementing custom stages: http://doc.akka.io/ docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-customize.html And there are also plenty examples in the stream cookbook: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/ stream-cookbook.html which could be useful when implementing your custom processing stages. Hope this helps, happy hakking! -- Konrad 'ktoso' Malawski hAkker @ typesafe http://akka.io On 25 December 2014 at 14:13:32, Mark van Buskirk (markvan...@gmail.com) wrote: Id love to see an example of two actors that are both publisher and subscriber connected. I've been trying to get that going and have been having a bunch of trouble getting it working. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user.
Re: [akka-user] [akka-stream 0.9] Create Flow[A,B] from Publisher[A] and Subscriber[B]
Hi Mark, You're completely right a supervision of streams has been on our minds for a long time now. Sadly you bumped into an yet not implemented area of Akka streams, here's the issue we're talking about here - https://github.com/akka/akka/issues/15710 We had multiple talks on how to apply supervision to streams and think we've prepared the infrastructure for it yet this exact feature remains to be implemented before we ship a full 1.0 (please remember the current release is still a preview.). Thanks for the feedback and happy hakking! -- Konrad 'ktoso' Malawski (sent from my mobile) On 27 Dec 2014 15:04, Mark van Buskirk markvanbusk...@gmail.com wrote: I think I'm clinging to trying to use actors as both publisher and subscriber because I like actor supervision and I don't see how you do supervision with streams. Does a custom Stage / transformer have supervision where exceptions/errors can cause a restart/resume/escalate that affects a particular steam element but let's the following elements continue on? What am I missing? I feel like there is something big here that I'm just not seeing. I want backpressure, supervision and the ability to make cohesive little modules of code, I.e. micro services. Then I want to pass commands and domain objects between these microservices. I want back pressure, and supervision at all levels. Am I misguided in this design? Am I missing something obvious? On Sat, Dec 27, 2014, 05:12 Piotr Buszka pi...@buszka.eu wrote: Hi, I wanted as well to use ActorPublisher/ActorSubcriber pair. My use case was related to dynamically scaled number of wokers on a transforming task with a possible worker failure, restart and redo (based on Work Pulling pattern by Derek Wyatt on http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2 ) I wanted to use streams for all other processing but this particular stage needs to be dynamically scaled so I cannot use cookbook pattern Balancing jobs to a fixed pool of workers and I am not yet sure if i can use actor ask pattern from Globally limiting the rate of a set of streams example to achieve desired result. Independently of my particular problem I think the question could be restated what is the best practice in interfacing a flow transformer in akka streams with other parts of system which use actors. To be more exact is it possible for a Flow transformer (Sink and Source works) to process actor messages from external world which goes beyond actor ask pattern from Limiter example and be able to receive and process external actor messages in a thread safe way. I think it implies that the underlying actor must be the same instance for stream management and custom processing as it is for an ActorSubcriber or ActorPublisher. I imagine the api would extend the existing PropsSource and PropsSink and add a PropsFlow trait ActorFlow[In, Out] extends ActorPublisher[Out] with ActorSubscriber { .. } class MyActorFlow extends ActorFlow[In, Out] { ... } val props = Props(classOf[MyActorFlow]) val flowActor = PropsFlow[In, Out](props) val source = ... val sink = val flowGraph = FlowGraph { implicit builder = source ~ flowActor ~ sink } I think all pieces are already available with ActorPublisher and ActorSubscriber infrastructure and the missing part is some API code to support ActorFlow in my example. Best regards, Piotr Buszka On Friday, 26 December 2014 02:51:19 UTC+1, Konrad Malawski wrote: Hi Mark, would you mind explaining what exact use case you’re trying to cover? Instead of dropping down to reactive streams interfaces (publisher / subscriber) which are *very low-level*, instead you could perhaps stick with our much simpler to use custom Stages. Here’s the docs on implementing custom stages: http://doc.akka.io/ docs/akka-stream-and-http-experimental/1.0-M2/scala/ stream-customize.html And there are also plenty examples in the stream cookbook: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/ stream-cookbook.html which could be useful when implementing your custom processing stages. Hope this helps, happy hakking! -- Konrad 'ktoso' Malawski hAkker @ typesafe http://akka.io On 25 December 2014 at 14:13:32, Mark van Buskirk (markvan...@gmail.com) wrote: Id love to see an example of two actors that are both publisher and subscriber connected. I've been trying to get that going and have been having a bunch of trouble getting it working. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to