Re: [akka-user] import context.dispatcher nullpointerexception
I agree with Heiko on this one - import context.dispatcher is used everywhere. It is mentioned in the docs not to close over 'context' and access it outside of message handlers, however I think more education is needed here. On Wednesday, March 7, 2018 at 5:35:05 PM UTC-8, Konrad Malawski wrote: > > Yes it is nulled using unsafe. > > -- > Cheers, > Konrad 'ktoso <http://kto.so>' Malawski > Akka <http://akka.io/> @ Lightbend <http://lightbend.com/> > > On March 8, 2018 at 7:32:56, Heiko Seeberger (loe...@posteo.de > ) wrote: > > `import context.dispatcher` is what „everybody“ is doing, not? I’m using > it all over the place, because I have learned that one can import from > stable identifiers (e.g. vals) in Scala. Hence I don’t think changing the > docs has the necessary effect. > > How can a final val be „nulled"? Unsafe? Reflection? > > Cheers > Heiko > > -- > > Heiko Seeberger > Home: heikoseeberger.de > Twitter: @hseeberger > Public key: keybase.io/hseeberger > > > > > Am 07.03.2018 um 19:23 schrieb Patrik Nordwall <patrik@gmail.com > >: > > Thanks! > > On Wed, Mar 7, 2018 at 6:16 PM, Jeff <jknigh...@gmail.com > > wrote: > >> An example of documentation for using import context.dispatcher is here >> https://doc.akka.io/docs/akka/2.5/futures.html#within-actors >> >> I can create some PR to update the documentation >> >> On Wednesday, March 7, 2018 at 6:09:24 AM UTC-8, Patrik Nordwall wrote: >>> >>> It's because when the actor is stopped some of the fields are cleared >>> (yeah, even though they are vals) to "help" the GC in case something (e.g. >>> an local ActorRef) is still referencing the actor instance. >>> >>> implicit val ec = context.dispatcher >>> >>> would solve it here. >>> >>> Where in the documentation is the import recommended? We should probably >>> update that. Would you be able creating a PR fixing it? Thanks. >>> >>> /Patrik >>> >>> On Tue, Mar 6, 2018 at 10:44 PM, Jeff <jknigh...@gmail.com> wrote: >>> >>>> I suspected as much. So what would you suggest as for handling use >>>> cases where you could have chained flatmaps on futures that at the end >>>> will >>>> pipeTo a message back to the actor. Should we set the ExecutionContext to >>>> a >>>> val? >>>> >>>> On Tuesday, March 6, 2018 at 12:09:22 PM UTC-8, √ wrote: >>>>> >>>>> Context is bound to the lifecycle of the actor. >>>>> >>>>> On Tue, Mar 6, 2018 at 8:37 PM, Jeff <jknigh...@gmail.com> wrote: >>>>> >>>>>> I have noticed an issue where if a future maps/flatmaps after actor >>>>>> shutdown, a NullPointerException is thrown. I've narrowed it down to >>>>>> the import context.dispatcher, which I technically understand since >>>>>> that is closing over actor state. If I change that to implicit val >>>>>> ec = context.dispatcher, everything works fine. >>>>>> >>>>>> I'd like to understand what is the best practice in this case, since >>>>>> the documentation for context.dispatcher indicates that it is >>>>>> threadsafe and looking at the actor trait, context is a val. Most >>>>>> documentation seems to indicate that import context.dispatcher is >>>>>> preferred. >>>>>> >>>>>> Thanks >>>>>> Jeff >>>>>> >>>>>> -- >>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>> >>>>>>>>>> Check the FAQ: >>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>> >>>>>>>>>> Search the archives: >>>>>> https://groups.google.com/group/akka-user >>>>>> --- >>>>>> You received this message because you are subscribed to the Google >>>>>> Groups "Akka User List" group. >>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>> send an email to akka-user+...@googlegroups.com. >>>>>> To post to this group, send email to akka...@googlegroups.com. >>>>>> Visit this group at https://groups.google.com/group/akka-user. >>>>>> For more options,
Re: [akka-user] import context.dispatcher nullpointerexception
An example of documentation for using import context.dispatcher is here https://doc.akka.io/docs/akka/2.5/futures.html#within-actors I can create some PR to update the documentation On Wednesday, March 7, 2018 at 6:09:24 AM UTC-8, Patrik Nordwall wrote: > > It's because when the actor is stopped some of the fields are cleared > (yeah, even though they are vals) to "help" the GC in case something (e.g. > an local ActorRef) is still referencing the actor instance. > > implicit val ec = context.dispatcher > > would solve it here. > > Where in the documentation is the import recommended? We should probably > update that. Would you be able creating a PR fixing it? Thanks. > > /Patrik > > On Tue, Mar 6, 2018 at 10:44 PM, Jeff <jknigh...@gmail.com > > wrote: > >> I suspected as much. So what would you suggest as for handling use cases >> where you could have chained flatmaps on futures that at the end will >> pipeTo a message back to the actor. Should we set the ExecutionContext to a >> val? >> >> On Tuesday, March 6, 2018 at 12:09:22 PM UTC-8, √ wrote: >>> >>> Context is bound to the lifecycle of the actor. >>> >>> On Tue, Mar 6, 2018 at 8:37 PM, Jeff <jknigh...@gmail.com> wrote: >>> >>>> I have noticed an issue where if a future maps/flatmaps after actor >>>> shutdown, a NullPointerException is thrown. I've narrowed it down to >>>> the import context.dispatcher, which I technically understand since >>>> that is closing over actor state. If I change that to implicit val ec >>>> = context.dispatcher, everything works fine. >>>> >>>> I'd like to understand what is the best practice in this case, since >>>> the documentation for context.dispatcher indicates that it is >>>> threadsafe and looking at the actor trait, context is a val. Most >>>> documentation seems to indicate that import context.dispatcher is >>>> preferred. >>>> >>>> Thanks >>>> Jeff >>>> >>>> -- >>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>> >>>>>>>>>> Check the FAQ: >>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>> >>>>>>>>>> Search the archives: >>>> https://groups.google.com/group/akka-user >>>> --- >>>> You received this message because you are subscribed to the Google >>>> Groups "Akka User List" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to akka-user+...@googlegroups.com. >>>> To post to this group, send email to akka...@googlegroups.com. >>>> Visit this group at https://groups.google.com/group/akka-user. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> >>> >>> >>> -- >>> Cheers, >>> √ >>> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to akka-user+...@googlegroups.com . >> To post to this group, send email to akka...@googlegroups.com >> . >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > > Patrik Nordwall > Akka Tech Lead > Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM > Twitter: @patriknw > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] import context.dispatcher nullpointerexception
I suspected as much. So what would you suggest as for handling use cases where you could have chained flatmaps on futures that at the end will pipeTo a message back to the actor. Should we set the ExecutionContext to a val? On Tuesday, March 6, 2018 at 12:09:22 PM UTC-8, √ wrote: > > Context is bound to the lifecycle of the actor. > > On Tue, Mar 6, 2018 at 8:37 PM, Jeff <jknigh...@gmail.com > > wrote: > >> I have noticed an issue where if a future maps/flatmaps after actor >> shutdown, a NullPointerException is thrown. I've narrowed it down to the >> import >> context.dispatcher, which I technically understand since that is closing >> over actor state. If I change that to implicit val ec = >> context.dispatcher, everything works fine. >> >> I'd like to understand what is the best practice in this case, since the >> documentation for context.dispatcher indicates that it is threadsafe and >> looking at the actor trait, context is a val. Most documentation seems >> to indicate that import context.dispatcher is preferred. >> >> Thanks >> Jeff >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to akka-user+...@googlegroups.com . >> To post to this group, send email to akka...@googlegroups.com >> . >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > Cheers, > √ > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] import context.dispatcher nullpointerexception
I have noticed an issue where if a future maps/flatmaps after actor shutdown, a NullPointerException is thrown. I've narrowed it down to the import context.dispatcher, which I technically understand since that is closing over actor state. If I change that to implicit val ec = context.dispatcher, everything works fine. I'd like to understand what is the best practice in this case, since the documentation for context.dispatcher indicates that it is threadsafe and looking at the actor trait, context is a val. Most documentation seems to indicate that import context.dispatcher is preferred. Thanks Jeff -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Flow.mapAsync and downstream demand
Actually found a much simpler solution using Source.unfoldAsync. On Thursday, February 22, 2018 at 6:54:54 PM UTC-8, Jeff wrote: > > My particular use case is a long poll where the id to be sent on the next > request is returned from the previous request. > > On Thursday, February 22, 2018 at 6:40:48 PM UTC-8, Konrad Malawski wrote: >> >> In general relying on exact timing of pulls is rather seen as an anti >> pattern… >> Even if you do a custom stage, it’s very brittle, as any element put >> before your stage could cause pulls, simply because that’s how things work. >> The same can be said about simply changed buffer sizes between stages — >> introducing an async boundary anywhere in the pipeline can lead >> to more buffer space being allocated, and thus, more pulls being issued. >> >> In other words — due to the fact that backpressure is connected through >> an entire pipeline (and has to, otherwise it would not do what it should), >> you can not rely on exact timing of pulls, because changes in the >> pipeline may affect this. >> >> In your very specific case perhaps it’s doable to make a stage or >> pipeline that’ll do the right thing, with enforcing it with zips and loops, >> but that’ll be specific and take some work and very precise wording of >> what you want to achieve (sample code?) >> >> -- >> Cheers, >> Konrad 'ktoso <http://kto.so>' Malawski >> Akka <http://akka.io/> @ Lightbend <http://lightbend.com/> >> >> On February 23, 2018 at 7:05:18, Jeff (jknigh...@gmail.com) wrote: >> >> Currently, Flow.mapAsync and Flow.mapAsyncUnordered always pull upstream >> even if there is no demand from downstream and then buffer. However, there >> are situations where one might want to way for explicit downstream demand >> before pulling. For example, let's say that the next upstream item depends >> on the results of the previous item and that you need to feed that back >> into the upstream before pulling again. In its current form, Flow.mapAsync >> will pull before that feedback loop has been completed. >> >> Is there any way to work around that other than writing a custom >> GraphStage? >> >> Thanks >> Jeff >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to akka-user+...@googlegroups.com. >> To post to this group, send email to akka...@googlegroups.com. >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> >> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Flow.mapAsync and downstream demand
My particular use case is a long poll where the id to be sent on the next request is returned from the previous request. On Thursday, February 22, 2018 at 6:40:48 PM UTC-8, Konrad Malawski wrote: > > In general relying on exact timing of pulls is rather seen as an anti > pattern… > Even if you do a custom stage, it’s very brittle, as any element put > before your stage could cause pulls, simply because that’s how things work. > The same can be said about simply changed buffer sizes between stages — > introducing an async boundary anywhere in the pipeline can lead > to more buffer space being allocated, and thus, more pulls being issued. > > In other words — due to the fact that backpressure is connected through an > entire pipeline (and has to, otherwise it would not do what it should), > you can not rely on exact timing of pulls, because changes in the pipeline > may affect this. > > In your very specific case perhaps it’s doable to make a stage or pipeline > that’ll do the right thing, with enforcing it with zips and loops, > but that’ll be specific and take some work and very precise wording of > what you want to achieve (sample code?) > > -- > Cheers, > Konrad 'ktoso <http://kto.so>' Malawski > Akka <http://akka.io/> @ Lightbend <http://lightbend.com/> > > On February 23, 2018 at 7:05:18, Jeff (jknigh...@gmail.com ) > wrote: > > Currently, Flow.mapAsync and Flow.mapAsyncUnordered always pull upstream > even if there is no demand from downstream and then buffer. However, there > are situations where one might want to way for explicit downstream demand > before pulling. For example, let's say that the next upstream item depends > on the results of the previous item and that you need to feed that back > into the upstream before pulling again. In its current form, Flow.mapAsync > will pull before that feedback loop has been completed. > > Is there any way to work around that other than writing a custom > GraphStage? > > Thanks > Jeff > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+...@googlegroups.com . > To post to this group, send email to akka...@googlegroups.com > . > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Flow.mapAsync and downstream demand
Currently, Flow.mapAsync and Flow.mapAsyncUnordered always pull upstream even if there is no demand from downstream and then buffer. However, there are situations where one might want to way for explicit downstream demand before pulling. For example, let's say that the next upstream item depends on the results of the previous item and that you need to feed that back into the upstream before pulling again. In its current form, Flow.mapAsync will pull before that feedback loop has been completed. Is there any way to work around that other than writing a custom GraphStage? Thanks Jeff -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Single Writer/Multiple Reader and anonymous actors
Bumping this - what are the best practices about using anonymous actors to modify shared state? Thanks Jeff On Wednesday, December 27, 2017 at 4:51:04 PM UTC-8, Jeff wrote: > > I have a class that is read heavy with low contention on writes. I'd like > to be able to access a piece of shared mutable state outside of Actors. The > state itself (AnnouncementRegistry) is immutable. Since it is a single > writer, having a shared @volatile var between the actor and the class seems > to be a good approach, however it also has a code smell to it since I am > closing over external mutable state and using an anonymous actor. Is there > a better approach to what I have below? > > class DiscoveryClient(config: DiscoveryConfig, singleRequest: HttpRequest => > Future[HttpResponse]) > (implicit system: ActorSystem, m: Materializer) { > > @volatile > private[this] var _registry = AnnouncementRegistry.empty > > def registry = _registry > > private[this] val client = system.actorOf(Props(new Actor with ActorLogging > { > import DiscoveryClient._ > > private[this] var subscribers = HashSet.empty[ActorRef] > > private[this] val killswitch = > ChangeWatcher(config.seeds, singleRequest).to(Sink.foreach(self ! > RegistryUpdate(_))).run() > > def receive: Receive = uninitialized > > private[this] def uninitialized: Receive = { > case Subscribe => > subscribers += sender > context.watch(sender) > > case Terminated(ref) => > subscribers -= ref > > case m @ RegistryUpdate(newRegistry) => > _registry = newRegistry > context.become(initialized) > subscribers.foreach(_ ! m) > } > > private[this] def initialized: Receive = { > case Subscribe => > subscribers += sender > context.watch(sender) > sender ! RegistryUpdate(_registry) > > case Terminated(ref) => > subscribers -= ref > > case m @ RegistryUpdate(newRegistry) => > _registry = newRegistry > subscribers.foreach(_ ! m) > } > > override def postStop(): Unit = killswitch.shutdown() > }), "discovery-client") > } > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Single Writer/Multiple Reader and anonymous actors
I have a class that is read heavy with low contention on writes. I'd like to be able to access a piece of shared mutable state outside of Actors. The state itself (AnnouncementRegistry) is immutable. Since it is a single writer, having a shared @volatile var between the actor and the class seems to be a good approach, however it also has a code smell to it since I am closing over external mutable state and using an anonymous actor. Is there a better approach to what I have below? class DiscoveryClient(config: DiscoveryConfig, singleRequest: HttpRequest => Future[HttpResponse]) (implicit system: ActorSystem, m: Materializer) { @volatile private[this] var _registry = AnnouncementRegistry.empty def registry = _registry private[this] val client = system.actorOf(Props(new Actor with ActorLogging { import DiscoveryClient._ private[this] var subscribers = HashSet.empty[ActorRef] private[this] val killswitch = ChangeWatcher(config.seeds, singleRequest).to(Sink.foreach(self ! RegistryUpdate(_))).run() def receive: Receive = uninitialized private[this] def uninitialized: Receive = { case Subscribe => subscribers += sender context.watch(sender) case Terminated(ref) => subscribers -= ref case m @ RegistryUpdate(newRegistry) => _registry = newRegistry context.become(initialized) subscribers.foreach(_ ! m) } private[this] def initialized: Receive = { case Subscribe => subscribers += sender context.watch(sender) sender ! RegistryUpdate(_registry) case Terminated(ref) => subscribers -= ref case m @ RegistryUpdate(newRegistry) => _registry = newRegistry subscribers.foreach(_ ! m) } override def postStop(): Unit = killswitch.shutdown() }), "discovery-client") } -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: RestartSource/Flow/Sink practical examples
Still trying to figure out how I can use the new RestartSink/Flow/Source to retry an Http request in a Stream. Any thoughts? On Thursday, October 5, 2017 at 12:29:07 PM UTC-7, Jeff wrote: > > I am trying to create a polling flow which feeds the results of the last > update into the next update (much like the SSE example here > https://youtu.be/-7OyuTMgI1I). However, I'd like to add an exponential > backoff on failed requests. The new RestartSource/Flow/Sink seems like the > correct fit, however I can't seem to get it to work in practice. The > strawman example I have is > > val flow = Flow[Long].mapAsyncUnordered(1) { lastIndex => > > val request = RestartFlow.withBackoff(1.second, 10.seconds, .20)(() => { > Flow[Long].mapAsyncUnordered(1) { lastIndex => > val p = Promise[WatchResult] > pool ! HAConnectionPool.Watch(lastIndex, p) > p.future > } > }) > > Source.repeat(lastIndex).via(request).runWith(Sink.head) > } > > > The problem with this however is that mapAsyncUnordered() will always > pull the upstream when the future completes, even if there is no downstream > demand, causing 2 requests to be made instead of one. I thought about using > RestartSource, but it appears that it will keep resetting the > Source.single on completeStage(), which is undesirable. > > Thoughts? > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: RestartSource/Flow/Sink practical examples
bump On Thursday, October 5, 2017 at 12:29:07 PM UTC-7, Jeff wrote: > > I am trying to create a polling flow which feeds the results of the last > update into the next update (much like the SSE example here > https://youtu.be/-7OyuTMgI1I). However, I'd like to add an exponential > backoff on failed requests. The new RestartSource/Flow/Sink seems like the > correct fit, however I can't seem to get it to work in practice. The > strawman example I have is > > val flow = Flow[Long].mapAsyncUnordered(1) { lastIndex => > > val request = RestartFlow.withBackoff(1.second, 10.seconds, .20)(() => { > Flow[Long].mapAsyncUnordered(1) { lastIndex => > val p = Promise[WatchResult] > pool ! HAConnectionPool.Watch(lastIndex, p) > p.future > } > }) > > Source.repeat(lastIndex).via(request).runWith(Sink.head) > } > > > The problem with this however is that mapAsyncUnordered() will always > pull the upstream when the future completes, even if there is no downstream > demand, causing 2 requests to be made instead of one. I thought about using > RestartSource, but it appears that it will keep resetting the > Source.single on completeStage(), which is undesirable. > > Thoughts? > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] RestartSource/Flow/Sink practical examples
I am trying to create a polling flow which feeds the results of the last update into the next update (much like the SSE example here https://youtu.be/-7OyuTMgI1I). However, I'd like to add an exponential backoff on failed requests. The new RestartSource/Flow/Sink seems like the correct fit, however I can't seem to get it to work in practice. The strawman example I have is val flow = Flow[Long].mapAsyncUnordered(1) { lastIndex => val request = RestartFlow.withBackoff(1.second, 10.seconds, .20)(() => { Flow[Long].mapAsyncUnordered(1) { lastIndex => val p = Promise[WatchResult] pool ! HAConnectionPool.Watch(lastIndex, p) p.future } }) Source.repeat(lastIndex).via(request).runWith(Sink.head) } The problem with this however is that mapAsyncUnordered() will always pull the upstream when the future completes, even if there is no downstream demand, causing 2 requests to be made instead of one. I thought about using RestartSource, but it appears that it will keep resetting the Source.single on completeStage(), which is undesirable. Thoughts? -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-streams] Shared Materializer in akka extensions
A Consul like service discovery client built on top of akka-http -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-streams] Shared Materializer in akka extensions
That's what I was leaning towards as well, since materialzers are fairly lightweight. However, I've seen that statement prefaced with, "...right now" so I wanted to make sure there wasn't some other way to share. -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-streams] Shared Materializer in akka extensions
How, exactly. You only have access to an ExtendedActorSystem inside of createExtension() -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [akka-streams] Shared Materializer in akka extensions
I am building an akka extension, but there doesn't seem to be an elegant way to pass in a shared Materializer. What would be the suggested way to use a shared materializer between akka extensions and the main application? -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: [akka-http] Http().superPool() and MergeHub.source backpressure
I'm embarrassed I forgot something so obvious. Much thanks! On Tuesday, August 15, 2017 at 8:50:27 AM UTC-7, johannes...@lightbend.com wrote: > > Hi Jeff, > > if you don't read the response bodies of all the responses, your pipeline > will stall because the super pool connection are still waiting for your > code to actually read the responses. In your example, try to add > `x.discardEntityBytes` (or actually read the entity) inside of the > `Sink.foreach` block. > > See > http://doc.akka.io/docs/akka-http/current/scala/http/implications-of-streaming-http-entity.html > > for more information on that topic. > > Johannes > > On Thursday, August 10, 2017 at 1:53:18 AM UTC+2, Jeff wrote: >> >> I am getting behavior that I do not understand combining >> Http().superPool() with MergeHub.source. Here is a sample application >> >> import akka.actor.ActorSystem >> import akka.http.scaladsl.Http >> import akka.http.scaladsl.model.{HttpRequest, Uri} >> import akka.stream.{ActorMaterializer, ThrottleMode} >> import akka.stream.scaladsl.{MergeHub, Sink, Source} >> import com.typesafe.config.ConfigFactory >> >> import scala.concurrent.duration._ >> >> object Main extends App { >> val config = ConfigFactory.load() >> >> implicit val system = ActorSystem("test-system", config) >> implicit val executionContext = system.dispatcher >> implicit val materializer = ActorMaterializer() >> >> val request = HttpRequest(uri = Uri("http://www.google.com;)) >> >> val sink = MergeHub.source[(HttpRequest, Int)] >> .throttle(1, 1.second, 1, ThrottleMode.shaping) >> .map { x => >> println(s"started ${x._2}") >> x >> } >> .via(Http().superPool()) >> .to(Sink.foreach { x => >> println(s"finished ${x._2}") >> }).run() >> >> for (x <- 1 to 100) sink.runWith(Source.single(request -> x)) >> } >> >> >> The output of which is >> >> started 2 >> finished 2 >> started 3 >> finished 3 >> started 4 >> finished 4 >> started 1 >> finished 1 >> started 5 >> started 6 >> started 7 >> started 8 >> >> My understanding from the documentation is that Http().superPool() will >> backpressure. However, after 8 iterations, nothing else happens. Thoughts? >> >> Thanks >> Jeff >> > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: [akka-http] Http().superPool() and MergeHub.source backpressure
Quickly following up on this. My understanding of how MergeHub works leads me to believe that all 100 requests in my example should fire, however only 8 of them do, and only 4 complete. On Wednesday, August 9, 2017 at 4:53:18 PM UTC-7, Jeff wrote: > > I am getting behavior that I do not understand combining > Http().superPool() with MergeHub.source. Here is a sample application > > import akka.actor.ActorSystem > import akka.http.scaladsl.Http > import akka.http.scaladsl.model.{HttpRequest, Uri} > import akka.stream.{ActorMaterializer, ThrottleMode} > import akka.stream.scaladsl.{MergeHub, Sink, Source} > import com.typesafe.config.ConfigFactory > > import scala.concurrent.duration._ > > object Main extends App { > val config = ConfigFactory.load() > > implicit val system = ActorSystem("test-system", config) > implicit val executionContext = system.dispatcher > implicit val materializer = ActorMaterializer() > > val request = HttpRequest(uri = Uri("http://www.google.com;)) > > val sink = MergeHub.source[(HttpRequest, Int)] > .throttle(1, 1.second, 1, ThrottleMode.shaping) > .map { x => > println(s"started ${x._2}") > x > } > .via(Http().superPool()) > .to(Sink.foreach { x => > println(s"finished ${x._2}") > }).run() > > for (x <- 1 to 100) sink.runWith(Source.single(request -> x)) > } > > > The output of which is > > started 2 > finished 2 > started 3 > finished 3 > started 4 > finished 4 > started 1 > finished 1 > started 5 > started 6 > started 7 > started 8 > > My understanding from the documentation is that Http().superPool() will > backpressure. However, after 8 iterations, nothing else happens. Thoughts? > > Thanks > Jeff > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [akka-http] Http().superPool() and MergeHub.source backpressure
I am getting behavior that I do not understand combining Http().superPool() with MergeHub.source. Here is a sample application import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.{HttpRequest, Uri} import akka.stream.{ActorMaterializer, ThrottleMode} import akka.stream.scaladsl.{MergeHub, Sink, Source} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ object Main extends App { val config = ConfigFactory.load() implicit val system = ActorSystem("test-system", config) implicit val executionContext = system.dispatcher implicit val materializer = ActorMaterializer() val request = HttpRequest(uri = Uri("http://www.google.com;)) val sink = MergeHub.source[(HttpRequest, Int)] .throttle(1, 1.second, 1, ThrottleMode.shaping) .map { x => println(s"started ${x._2}") x } .via(Http().superPool()) .to(Sink.foreach { x => println(s"finished ${x._2}") }).run() for (x <- 1 to 100) sink.runWith(Source.single(request -> x)) } The output of which is started 2 finished 2 started 3 finished 3 started 4 finished 4 started 1 finished 1 started 5 started 6 started 7 started 8 My understanding from the documentation is that Http().superPool() will backpressure. However, after 8 iterations, nothing else happens. Thoughts? Thanks Jeff -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Flow.fromSinkAndSource and backpressure
I didn't see much in the documentation, so I thought I'd ask. If I have a Sink.actorRef and a Source.actorRef, and I combine them using Flow.fromSinkAndSource, does this somehow propagate backpressure? -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: [akka-streams] Generic streams and abstract types
My understanding from the documentation is that using Http.singleRequest will fail requests when in a backpressure situation, while using a MergeHub with the flow will in fact backpressure. Is that a correct understanding? Jeff On Thursday, July 13, 2017 at 4:46:39 AM UTC-7, johannes...@lightbend.com wrote: > > On Wednesday, July 12, 2017 at 9:08:52 PM UTC+2, Jeff wrote: >> >> As for the issue of complexity, it's actually not as complex as it >> sounds. I'm using Http().superPool() to make api requests and I wanted to >> avoid having to create a separate stream for every single iteration of api >> request when the only thing that changed was the Unmarshaller. Instead of >> materializing multiple streams where the only thing that changed was the >> Sink, I just created one stream where the Sink.foreach(...) take the >> Unmarshaller function and resolves the Promise. >> > > You could just use Http.singleRequest in that case because it implements > almost the same kind of logic and also uses the same pool as superPool. > > Johannes > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: [akka-streams] Generic streams and abstract types
Thanks for the great suggestions - I eventually came to the "custom tuple" solution myself and it seems to work well. As for the issue of complexity, it's actually not as complex as it sounds. I'm using Http().superPool() to make api requests and I wanted to avoid having to create a separate stream for every single iteration of api request when the only thing that changed was the Unmarshaller. Instead of materializing multiple streams where the only thing that changed was the Sink, I just created one stream where the Sink.foreach(...) take the Unmarshaller function and resolves the Promise. On Wednesday, July 12, 2017 at 5:24:07 AM UTC-7, johannes...@lightbend.com wrote: > > Hi Jeff, > > your API seems quite complex. I don't know the purpose of it so I cannot > suggest anything but I'd try to simplify. :) > > That said, your problem seems to be that you cannot write a concrete type > that would express the dependency between the two components of the tuple > `(RequestBuilder, Promise[???])`. There are two ways to solve it: > > 1) make `Out` a type parameter and convert `sink` to a `def sink[Out]`, > then you can use the tuple `(RequestBuilder[Out], Promise[Out])` > 2) create your custom tuple type that allows to express the dependency: > > case class BuilderWithPromise[O](builder: RequestBuilder { type Out = O }, > promise: Promise[O]) > > and then use it as MergeHub.source[BuilderWithPromise[_]] > > But I can only repeat that getting rid of the dependent types if possible > is usually the best solution ;) > > Johannes > > On Thursday, July 6, 2017 at 11:23:50 PM UTC+2, Jeff wrote: >> >> Here is a strawman program which illustrates the issue I am having >> >> trait RequestBuilder { >> type Out >> >> def complete(p: Promise[Out]): Unit >> } >> >> def makeRequest(in: RequestBuilder): Source[(RequestBuilder, >> Promise[in.Out]), Future[in.Out]] = { >> val p = Promise[in.Out] >> >> Source.single(in -> p).mapMaterializedValue(_ => p.future) >> } >> >> val sink = MergeHub.source[(RequestBuilder, Promise[???])].to(Sink.foreach { >> case (r, p) => r.complete(p) >> }).run() >> >> sink.runWith(makeRequest(new RequestBuilder { >> type Out = Int >> >> def complete(p: Promise[Out]): Unit = p.success(1) >> })) >> >> >> The issue is, how do I type the Promise[???] in the sink? I have been >> able to work around this by making the Promise a part of the RequestBuilder >> trait itself, but this seems like a code smell to me >> > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: [akka-streams] Generic streams and abstract types
Any thoughts? On Thursday, July 6, 2017 at 2:23:50 PM UTC-7, Jeff wrote: > > Here is a strawman program which illustrates the issue I am having > > trait RequestBuilder { > type Out > > def complete(p: Promise[Out]): Unit > } > > def makeRequest(in: RequestBuilder): Source[(RequestBuilder, > Promise[in.Out]), Future[in.Out]] = { > val p = Promise[in.Out] > > Source.single(in -> p).mapMaterializedValue(_ => p.future) > } > > val sink = MergeHub.source[(RequestBuilder, Promise[???])].to(Sink.foreach { > case (r, p) => r.complete(p) > }).run() > > sink.runWith(makeRequest(new RequestBuilder { > type Out = Int > > def complete(p: Promise[Out]): Unit = p.success(1) > })) > > > The issue is, how do I type the Promise[???] in the sink? I have been > able to work around this by making the Promise a part of the RequestBuilder > trait itself, but this seems like a code smell to me > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [akka-streams] Generic streams and abstract types
Here is a strawman program which illustrates the issue I am having trait RequestBuilder { type Out def complete(p: Promise[Out]): Unit } def makeRequest(in: RequestBuilder): Source[(RequestBuilder, Promise[in.Out]), Future[in.Out]] = { val p = Promise[in.Out] Source.single(in -> p).mapMaterializedValue(_ => p.future) } val sink = MergeHub.source[(RequestBuilder, Promise[???])].to(Sink.foreach { case (r, p) => r.complete(p) }).run() sink.runWith(makeRequest(new RequestBuilder { type Out = Int def complete(p: Promise[Out]): Unit = p.success(1) })) The issue is, how do I type the Promise[???] in the sink? I have been able to work around this by making the Promise a part of the RequestBuilder trait itself, but this seems like a code smell to me -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: akka streams with http - downstream error propagation without collapsing the stream
I am also running into a similar situation and have not found a solution that is satisfactory On Monday, May 30, 2016 at 2:03:55 PM UTC-7, Matthew wrote: > > Hi Leslie, > > Did you ever find a nice pattern for this? I ask because I'm doing > something very similar and am currently unhappy with the error handling > capabilities of Akka streams... > > On Monday, July 13, 2015 at 9:50:10 AM UTC-4, leslie...@googlemail.com > wrote: >> >> I've tried to implement a straight forward http flow using akka streams >> 1.0-RC4. It's just about getting some http/json input which is >> transformed/evaluated, gets some more info via akka-http, transformed >> evaluated and finally outputs to json http response. >> For the optimistic case - where no errors occur - its quite simple and >> straight forward. >> >> Thinking about the error handling propagation makes it quite complicated >> to me. Especially when keeping the stream running while don't swallow >> errors silently. They should appear in the final Sink stage wrapped into a >> Try or Either object. To make it worse nearly every stage could run into >> some kind of error/exception case. >> I thought about some options which seam not really promising to me like: >> >> 1. For each stage just propagate a Try or Either object to the output, >> like HttpExt.superPool flow does. >> -would need to pass data through all stages of the stream even if the >> first stage failed already. >> -each flow stage would need a Try/Either input than in order to handle >> the upstream success/error case. >> -at least HttpExt.superPool flow stage would not accept Try/Either >> input so the flow needs to be forked here >> -it seems quite complicated to me to make every flow stage a graph with >> two outputs (one for success flowing into the next stage and one for error >> flowing into the final stage) >> >> 2. Exception handling: >> -just throwing exceptions collapses the stream which is definitely not >> what I want >> - (misuse?) the supervision decider for error handling, but I have no >> access to the final Sink stage here >> -when using decider exception handler, I would need to add it either to >> each stage separately or globally to the materializer (can I have on per >> stream or is it to heavy weight?, than this would be ok. but still no >> access to the Sink) >> -when using decider exception handler, I still would need to add >> exception handling code to all stages due to the request context Promise >> which I need to pass through all flow stages in order to fulfill the >> incoming request in the sink. There needs to be some way in order to pass >> this Promise request context to the decider. >> >> Currently I use the supervision decider solution. I intercept all flow >> stage operations. In case of exceptions I put the original exception >> together with the request context Promise into a special exception and let >> it throw to the decider where I can complete the request context Promise >> with failure and resume the flow. >> >> It looks somehow not right to me but I run out of ideas so far. >> >> Regards, >> Leslie >> >> ++ >> |http/json in+-+ >> +-+--+ | >> || >> v| >> +-+--+ | >> |several processing steps+-+ >> +-+--+ | >> || >> v| >> +-+---+| >> |akka http++ >> +-+---+| >> || >> v| >> +-+--+ | >> |several processing steps+-+ >> +-+--+ | >> ||error / exception >> v|propagation? >> +-+---+| >> |akka http++ >> +-+---+| >> || >> v| >> +-+--+ | >> |several processing steps+-+ >> +-+--+ | >> || >> vv >> +-++--+ >> |final http/json out (succes or error)| >> +-+ >> > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to
[akka-user] Re: [akka-http] Tagless Final and ToResponseMarshallable
That gave me a lot of inspiration, much thanks. I've adapted that into the following pattern: abstract class RestaurantRoutes[F[_]: Monad](algebra: RestaurantAlgebra[F]) extends MonadToResponseMarshaller[F] { import algebra._ def route: Route = path("api" / "v4.0" / "restaurant" / IntNumber) { rid => get { complete(getRestaurantById(rid).map(_.toString)) } } } trait MonadToResponseMarshaller[F[_]] { implicit def monadMarshaller[A: ToResponseMarshaller]: ToResponseMarshaller[F[A]] } trait FutureOfOptionToResponseMarshaller { this: MonadToResponseMarshaller[FutureOfOption] => import CatsMarshallers._ def monadMarshaller[A: ToResponseMarshaller]: ToResponseMarshaller[FutureOfOption[A]] = implicitly } trait Routes { import cats.instances.future._ def routes(implicit ec: ExecutionContext): Route = { val restaurant = new RestaurantRoutes(new RestaurantFutureInterpreter()) with FutureOfOptionToResponseMarshaller restaurant.route } } trait CatsMarshallers { implicit def optionTMarshaller[F[_], A, B](implicit m: Marshaller[F[Option[A]], B]): Marshaller[OptionT[F, A], B] = Marshaller { implicit ec => v => m(v.value) } } object CatsMarshallers extends CatsMarshallers This seems fairly clean and composable, but I'm wondering if there's a way to prevent having to mix-in FutureOfOptionToResponseMarshaller every time. On Tuesday, June 13, 2017 at 10:30:53 PM UTC-7, Juan José Vázquez Delgado wrote: > > I've faced a similar situation recently working on this example > <https://github.com/juanjovazquez/scala-petclinic>. It's not easy but is > definitely feasible. The bottom line is: run your own marshaller. > > El martes, 13 de junio de 2017, 18:44:01 (UTC+2), Jeff escribió: >> >> I am trying to learn tagless final style to remove concrete monads (ie >> futures) from my code. I have come up with the following strawman >> application (using cats) >> >> class RestaurantRoutes[F[_]: Monad](algebra: RestaurantAlgebra[F]) { >> import algebra._ >> >> def route: Route = >> path("api" / "v4.0" / "restaurant" / IntNumber) { rid => >> get { >> complete(getRestaurantById(rid).map(_.toString).value) >> } >> } >> } >> >> object RestaurantRoutes { >> def apply()(implicit ec: ExecutionContext): >> RestaurantRoutes[FutureOfOption] = { >> import cats.instances.future._ >> >> new RestaurantRoutes(new RestaurantFutureInterpreter()) >> } >> >> } >> >> the final type of >> >> getRestaurantById(rid).map(_.toString).value >> >> is a >> >> Future[Option[String]] >> >> However, I can't seem to figure out to get that marshalled. A marshaller >> exists for all those types, and had I had concrete types intstead of F[_], >> it would have worked. What is the best way to handle this to prevent >> implementation details from leaking in? >> >> Thanks >> Jeff >> > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-http] Tagless Final and ToResponseMarshallable
It's not even limited to Futures - in this case, how do I marshall any content in this situation when the type is parameterized? On Tuesday, June 13, 2017 at 12:50:40 PM UTC-7, Justin du coeur wrote: > > Good luck -- I've been doing a lot with tagless final in my Akka Sharded / > Persistent code, and have the beginnings of a library for that > <https://github.com/jducoeur/Querki/tree/master/querki/scalajvm/app/funcakka>, > > but I've found that *completely* doing away with the Futures is pretty > challenging. I've gotten it most of the way there, but I still have to > cheat occasionally... > > On Tue, Jun 13, 2017 at 12:44 PM, Jeff <jknigh...@gmail.com > > wrote: > >> I am trying to learn tagless final style to remove concrete monads (ie >> futures) from my code. I have come up with the following strawman >> application (using cats) >> >> class RestaurantRoutes[F[_]: Monad](algebra: RestaurantAlgebra[F]) { >> import algebra._ >> >> def route: Route = >> path("api" / "v4.0" / "restaurant" / IntNumber) { rid => >> get { >> complete(getRestaurantById(rid).map(_.toString).value) >> } >> } >> } >> >> object RestaurantRoutes { >> def apply()(implicit ec: ExecutionContext): >> RestaurantRoutes[FutureOfOption] = { >> import cats.instances.future._ >> >> new RestaurantRoutes(new RestaurantFutureInterpreter()) >> } >> >> } >> >> the final type of >> >> getRestaurantById(rid).map(_.toString).value >> >> is a >> >> Future[Option[String]] >> >> However, I can't seem to figure out to get that marshalled. A marshaller >> exists for all those types, and had I had concrete types intstead of F[_], >> it would have worked. What is the best way to handle this to prevent >> implementation details from leaking in? >> >> Thanks >> Jeff >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to akka-user+...@googlegroups.com . >> To post to this group, send email to akka...@googlegroups.com >> . >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [akka-http] Tagless Final and ToResponseMarshallable
I am trying to learn tagless final style to remove concrete monads (ie futures) from my code. I have come up with the following strawman application (using cats) class RestaurantRoutes[F[_]: Monad](algebra: RestaurantAlgebra[F]) { import algebra._ def route: Route = path("api" / "v4.0" / "restaurant" / IntNumber) { rid => get { complete(getRestaurantById(rid).map(_.toString).value) } } } object RestaurantRoutes { def apply()(implicit ec: ExecutionContext): RestaurantRoutes[FutureOfOption] = { import cats.instances.future._ new RestaurantRoutes(new RestaurantFutureInterpreter()) } } the final type of getRestaurantById(rid).map(_.toString).value is a Future[Option[String]] However, I can't seem to figure out to get that marshalled. A marshaller exists for all those types, and had I had concrete types intstead of F[_], it would have worked. What is the best way to handle this to prevent implementation details from leaking in? Thanks Jeff -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Akka Dispatchers that fail fast instead of enqueuing?
Hello akka-user list, I have had a recent experience of being surprised by the APIs in akka surrounding streaming HTTP. I would like some feedback on the solution that I have converged on. I'll begin with the use case: - Modules akka-actor, akka-stream, and akka-http-core are used to build a streaming data application. - The application ingests data via a purpose-built agent running on an upstream service. Upstream agent pushes HTTP POST requests encoded, for efficiency, in raw bytes. - Latency between upstream agent and service is low, so only a few inbound TCP connections will saturate the 10Gb/s NIC. - In order to ensure at least once delivery, agent is configured to retry all POSTS for which the service does not respond with status 200 (OK). - In order to ensure at least once delivery, service is configured to sync disk before responding with HTTP status 200 (OK). - Parallel instances of the agent partition the data before transmission. Given these constraints, it becomes clear that we need to limit the number of inbound connections. Only a few are necessary to saturate the connection, and more only increase the CPU and memory used. As an implementation that makes sense to me, HTTP status 503 can be issued to the agent upstream when maximum parallelism has been reached. In order to minimize disorder, the agent should be notified as soon as possible of the busy node so the agent can proceed with retry to a different node. Note that since the input is streaming to disk, an akka-streams ActorMaterializer as well as the http actor must have its parallelism limited. Once ActorMaterializer gets a flow started, it seems tricky to get it to stop efficiently. When I began with the problem, it appeared as very likely that one of the many implementations of dispatcher could provide this behavior. There is much language about "bounded mailboxes" and etc. My three attempts to solve this problem went like this: 1) Try to find some Typesafe config, .withDispatcher and related calls to bound the parallelism of the actor and ActorMaterializer. This started taking a long time, and all configurations found were enqueuing requests instead of failing. 2) I wrote a custom subclass of ThreadPoolExecutor with a hard maximum thread count. I wrapped it from ExecutionContext.fromExecutor and called my HTTP handler. This approach failed when I realized that even though the actor and ActorMaterializer were gated by my counter, my counter was decremented too soon. Remember, I only have to have say 3 connections, so 3 threads are very cheap. I had hoped Async.result could sit and keep a thread running for a connection. But as it turns out, even though all the non-blocking bloggers out there say to not call Async.result, I couldn't get Async.result or any related API to hold a thread! It must be doing something clever. . . . 3) Finally, I abandoned the Dispatcher and ExecutionContext APIs entirely, in favor of a simple Future: import scala.concurrent.{ExecutionContext, Promise, Future} import scala.util.Try class FutureSemaphore(numConcurrentMax : Int, ec : ExecutionContext) { implicit val _ec = ec var numConcurrent = 0 def submit[U](fnf : Unit => Future[U]) : Option[Future[U]] = { if(numConcurrent < numConcurrentMax) { synchronized { numConcurrent = numConcurrent + 1 } val f = fnf() val g = Promise[U]() f.onComplete{(t:Try[U]) => { synchronized { numConcurrent = numConcurrent - 1 } g.complete(t) }} Some(g.future) } else None } } While the code is small and seems to work as advertised, I am still confused about why this took too long to figure out. Did I miss something in the documentation? Here is the reference documentation: 3.4 Dispatchers An Akka MessageDispatcher is what makes Akka Actors “tick”, it is the engine of the machine so to speak. All MessageDispatcher implementations are also an ExecutionContext, which means that they can be used to execute arbitrary code, for instance Futures. If it is true that dispatchers cannot bound parallelism, it seems like basic design information and the reference should say so. If alternatively, some clever configuration of a dispatcher subclass can bound parallelism and fail fast, then the configuration should be shown. Maybe I missed it. I dunno. Thanks in advance for any insights. Regards, Jeff Henrikson -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You recei
[akka-user] [akka-streams] Graph and Stream of Streams
I am working my way through the websocket example and have a question about nested streams. Currently I have this: import akka.actor.ActorRef import akka.http.scaladsl.model.ws.{BinaryMessage, TextMessage, Message} import akka.stream.{OverflowStrategy, FlowShape} import akka.stream.scaladsl.{Sink, Source, FlowGraph, Flow} object ChatFlow { def apply(userId: Long, channel: Long, rabbitActor: ActorRef): Flow[Message, Message, ActorRef] = Flow.fromGraph(FlowGraph.create(Source.actorRef[RabbitActor.Message](5, OverflowStrategy.fail)) { implicit b => rabbitSource => import FlowGraph.Implicits._ // for now, ignore all content coming from the websocket val fromWebsocket = b.add(Sink.foreach[Message] { case TextMessage.Strict(content) => println(content) // case bm: BinaryMessage => bm.dataStream.runWith(Sink.ignore) }) // translate rabbit messages into socket messages val toWebSocket = b.add(Flow[RabbitActor.Message] map { case _ => TextMessage(s"test") }) // when this stream is materialized, register it with rabbitmq val websocketMaterialized = b.materializedValue map { actor => RabbitActor.UserJoined(userId, channel, actor) } val rabbitMQSink = Sink.actorRef[RabbitActor.Message](rabbitActor, RabbitActor.UserLeft(userId, channel)) websocketMaterialized ~> rabbitMQSink rabbitSource ~> toWebSocket FlowShape(fromWebsocket.inlet, toWebSocket.outlet) }) } My question is about the commented out BinaryMessage. bm.dataStream is a Source, and the docs say I should drain this stream to clear the socket. How would I do that in this scenario without passing in a materializer? -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Akka HTTP (Scala) 1.0-RC[12]: Processor actor terminated abruptly
Any solutions to this one yet? I'm getting the same error in a Play 2.4 app. During Global.onStop I call my subscribers onComplete function, but I still see the Processor actor terminated abruptly error. Any ideas? On Thursday, April 30, 2015 at 2:55:32 PM UTC-6, Samuel Tardieu wrote: Hi. With akka http 1.0-RC2 (was similar in 1.0-RC1), one of my program signals an intermittent error that I do not understand. The context: one HTTP GET request is sent out, the JSON response is properly received, decoded and acted upon, and then the system terminates with (system being my ActorSystem): Await.ready(Http().shutdownAllConnectionPools(), 5.seconds) system.shutdown() The error only happens from time to time (I’d say once every three runs). I have activated akka.actor.debug.{receive,autoreceive,lifecycle,fsm}. Any idea of what could be the cause? INFO] [04/30/2015 22:36:01.916] [run-main-0] [ActorSystem(default)] Initiating orderly shutdown of all active host connections pools... [DEBUG] [04/30/2015 22:36:01.921] [default-akka.actor.default-dispatcher-8] [akka://default/system/deadLetterListener] stopped [DEBUG] [04/30/2015 22:36:01.922] [default-akka.actor.default-dispatcher-5] [akka://default/user/$a/flow-2-3-publisherSource-processor-mapConcat] stopped [DEBUG] [04/30/2015 22:36:01.922] [default-akka.actor.default-dispatcher-6] [akka://default/user/$a/flow-2-9-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.retryMerge-flexiMerge] stopped [DEBUG] [04/30/2015 22:36:01.922] [default-akka.actor.default-dispatcher-5] [akka://default/user/$a/flow-2-2-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute] stopped [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-12] [akka://default/user/$a/flow-2-4-publisherSource-Merge] stopped [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-14] [akka://default/user/$a/flow-2-1-publisherSource-Merge] stopped [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-10] [akka://default/user/$a/flow-2-11-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.RetrySplit-flexiRoute] stopped [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-12] [akka://default/user/$a/flow-2-12-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute] stopped [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-14] [akka://default/user/$a/flow-2-14-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute] stopped [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-10] [akka://default/user/$a/flow-2-5-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute] stopped [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-12] [akka://default/user/$a/flow-2-13-publisherSource-processor-mapConcat] stopped [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-14] [akka://default/user/$a/flow-2-15-publisherSource-processor-mapConcat] stopped [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-10] [akka://default/user/$a/flow-2-6-publisherSource-processor-mapConcat] stopped [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-3] [akka://default/user] stopping [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-4] [akka://default/user/SlotProcessor-0] stopped [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-14] [akka://default/user/SlotProcessor-3] stopped [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-10] [akka://default/user/SlotProcessor-1] stopped [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-12] [akka://default/user/SlotProcessor-2] stopped [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-12] [akka://default/user/$a/flow-3-16-actorPublisherSource-actorSubscriberSink] stopped [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-13] [akka://default/user/$a/flow-2-8-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.Route-flexiRoute] stopped [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-11] [akka://default/user/$a/flow-2-7-publisherSource-PoolConductor.retryMerge-flexiMerge-mapConcat] stopped [DEBUG] [04/30/2015 22:36:01.928] [default-akka.actor.default-dispatcher-8] [akka://default/user/PoolInterfaceActor-0] stopped [DEBUG] [04/30/2015 22:36:01.928] [default-akka.actor.default-dispatcher-9] [akka://default/user/$a/flow-2-10-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.SlotSelector-flexiMerge] stopped [DEBUG] [04/30/2015 22:36:01.928] [default-akka.actor.default-dispatcher-14] [akka://default/user/$a/flow-3-0-actorPublisherSource-actorPublisherSource] stopped [DEBUG] [04/30/2015 22:36:01.928] [default-akka.actor.default-dispatcher-9]
Re: [akka-user] Promises and message passing
If everything is local an no serialization is required, would it be worth it? From the documentation on the website, it indicates that Ask has a performance cost associated with it. On Wednesday, July 15, 2015 at 8:02:57 PM UTC-7, √ wrote: Because you can't serialize a promise (if the actor decides to send the message to another node) On Thu, Jul 16, 2015 at 4:51 AM, Jeff jknigh...@gmail.com javascript: wrote: Is there any reason why I wouldn't want to pass a Promise to a (local) ActorRef to be resolved? I know the Ask pattern exists, but I would like to avoid the cost of having to allocate PromiseActorRefs. Thanks Jeff -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Promises and message passing
Is there any reason why I wouldn't want to pass a Promise to a (local) ActorRef to be resolved? I know the Ask pattern exists, but I would like to avoid the cost of having to allocate PromiseActorRefs. Thanks Jeff -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Tracking down Future timeout failures in a cluster under load
Hello, I am currently in the process of evaluating Akka. I have successfully built a proof of concept REST API using akka-http which is part of an Akka cluster system that has a number of Actors in it. Everything works well and I'm happy with what I am seeing until I try to ramp up the load on the REST API server and then I get the server to a point where the requests start failing with a Future timed out message. My problem is that I am not sure where to find out *why* and *where* the request are timing out. As I understand Akka, the timeout is simply a scheduler in the Future saying that I have exceeded the amount of time that I should wait for the Actor to return something. As such I am not sure how I would be able to find out at exactly what point the Actor I called was in processing my request when the timeout occurred. Can anyone give me some advice on how I should be tracking down this issue? Thanks in advance, Jeff -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-cluster] expected-response-after and acceptable-heartbeat-pause
So if my understanding is correct, expected-response-after and acceptable-heartbeat-pause do the exact same thing, the only difference is when they kick in. expected-response-after is for the first heartbeat, and accetable-heartbeat-pause is for every subsequent heartbeat. Is my understanding correct? Thanks Jeff On Monday, June 15, 2015 at 7:40:35 AM UTC-7, Patrik Nordwall wrote: expected-response-after is used for starting failure detection when the first heartbeat message is not received as expected, e.g. when connection is broken from the beginning. acceptable-heartbeat-pause is explained here: http://doc.akka.io/docs/akka/2.3.11/scala/cluster-usage.html#Failure_Detector /Patrik On Fri, Jun 5, 2015 at 12:41 AM, Jeff jknigh...@gmail.com javascript: wrote: Can somebody enlighten me on how these two settings interact? These two settings seem to overlap. Thanks Jeff -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-cluster] expected-response-after and acceptable-heartbeat-pause
Sure, ticket is here https://github.com/akka/akka/issues/17750 On Tuesday, June 16, 2015 at 12:51:56 PM UTC-7, Patrik Nordwall wrote: On Tue, Jun 16, 2015 at 8:51 PM, Jeff jknigh...@gmail.com javascript: wrote: So if my understanding is correct, expected-response-after and acceptable-heartbeat-pause do the exact same thing, the only difference is when they kick in. expected-response-after is for the first heartbeat, and accetable-heartbeat-pause is for every subsequent heartbeat. Is my understanding correct? expected-response-after is added on top of the acceptable-heartbeat-pause for the first heartbeat. Now when we discuss it I can't really find a good motivation for that delay, at least not as long as the default 5 seconds. Perhaps it is a leftover from when we used one-way heartbeating, instead of current request-response hearbeating. Would you mind creating a github issue https://github.com/akka/akka/issues and we will investigate if that delay can be removed, or reduced. Thanks, Patrik Thanks Jeff On Monday, June 15, 2015 at 7:40:35 AM UTC-7, Patrik Nordwall wrote: expected-response-after is used for starting failure detection when the first heartbeat message is not received as expected, e.g. when connection is broken from the beginning. acceptable-heartbeat-pause is explained here: http://doc.akka.io/docs/akka/2.3.11/scala/cluster-usage.html#Failure_Detector /Patrik On Fri, Jun 5, 2015 at 12:41 AM, Jeff jknigh...@gmail.com wrote: Can somebody enlighten me on how these two settings interact? These two settings seem to overlap. Thanks Jeff -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [akka-cluster] expected-response-after and acceptable-heartbeat-pause
Can somebody enlighten me on how these two settings interact? These two settings seem to overlap. Thanks Jeff -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [akka-streams 1.0-RC3] ActorSubscriber instances aren't shut down
I took a page out of the stream-integrations documentation, and hooked a simple ActorSubscriber up as a sink to a Flow. Something like: class TestActorSubscriber extends ActorSubscriber { override def postStop() = { println(Stopped + self.toString) super.postStop } override def preStart() = { println(Started + self.toString) super.preStart } override protected def requestStrategy: RequestStrategy = // some stuff override def receive: Receive = { case OnNext(x) = // some stuff } } implicit val mat = ActorFlowMaterializer() val runnableFlow = Source( () = io.Source.fromFile(foo.txt).getLines() ).to(Sink.actorSubscriber(Props(new TestActorSubscriber()) runnableFlow.run() Since the Sink takes a Props, and not an ActorRef, I assume the ActorSystemMaterializer is responsible for the lifecycle of that ActorSubscriber instance. When I run this flow, things work fine - the TestActorSubscriber is created (preStart fires), the messages stream through. However, the actor instance never stops once the iterator/Source completes. I can verify the OnComplete message gets to the TestActorSubscriber, but I never see the postStop fire. Looking at the ActorSubscriber source, I see that it's designed to stop the actor if there's an attempt to create a subscription after an OnComplete is received. It doesn't look like anything happens in the normal case though, where the OnComplete/OnError is delivered *after* a subscription has already been established. Is this just a bug? How is the lifecycle of ActorSubscriber/ActorPublisher instances managed? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Disambiguate actorRefs
I have an actor that I need to pass several actorRefs into (remote services). Is there a best practice for doing this with type safety (so a user doesn't accidentally pass the wrong actorRef)? Some options I've considered are using scalaz Tags and native scala Value Classes. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Receive as parameter
This is great advice, thanks! On Saturday, May 23, 2015 at 1:58:29 AM UTC-7, rkuhn wrote: Using something like Spores you could ascertain that your partial function does indeed not close over its surrounding Actor when creating anonymous Actors inline, but Spores are still an experiment. An equally safe but not as elegant solution is to declare the partial function in the Actor’s companion object and pass the needed values into the method that instantiates it (just like the `def props(...): Props` recommendation). Regards, Roland 22 maj 2015 kl. 23:57 skrev Jeff jknigh...@gmail.com javascript:: Following up on this question, what are the best practices around creating anonymous actors, as long as you are not closing over context of a parent actor? On Friday, May 22, 2015 at 12:08:45 PM UTC-7, Jeff wrote: Is it bad practice to pass in the Receive pf to an actor as part of the constructor arguments, assuming all vals it closes over are consts? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. *Dr. Roland Kuhn* *Akka Tech Lead* Typesafe http://typesafe.com/ – Reactive apps on the JVM. twitter: @rolandkuhn http://twitter.com/#!/rolandkuhn -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Akka Receive as parameter
Following up on this question, what are the best practices around creating anonymous actors, as long as you are not closing over context of a parent actor? On Friday, May 22, 2015 at 12:08:45 PM UTC-7, Jeff wrote: Is it bad practice to pass in the Receive pf to an actor as part of the constructor arguments, assuming all vals it closes over are consts? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Akka Receive as parameter
Is it bad practice to pass in the Receive pf to an actor as part of the constructor arguments, assuming all vals it closes over are consts? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] system.scheduler.scheduleOnce consumes Heap over time
I thought there was something specific I was doing wrong in an application I have deployed in development, which eventually needs to be production ready. Over time, the Akka actor system slowly consumes all available memory. I assumed it was a memory leak that I introduced, but as it turns out, even the sample Typesafe hello-akka app demonstrates the same heap consumption. http://www.typesafe.com/activator/template/hello-akka If you let the sample application above run for a very long, the heap usage slowly grows, and is never released. Although, it is Very Slow Since it is set to send a message to the greeter only every 1 second, this isn't very apparent. I simplified the pattern I am using in an actual application, that demonstrates the same issue, but at an accelerated rate. This is modeled after the Scheduling Periodic Messages pattern described here: http://doc.akka.io/docs/akka/snapshot/scala/howto.html The full sample code to recreate the memory consumption, with heap usage logging is available at the gist below. (note: if you start with the hello-akka sample application, and replace HelloAkkaScala.scala with this gist below, you can reproduce. You will need to add to your build.sbt the following (as mentioned in the comments at the top of the gist) com.kenshoo %% metrics-play % 2.3.0_0.1.9 https://gist.github.com/jeffsteinmetz/bbccb4815858620ab5a2 You may need to let it run for about 1 to 2 minutes before you see the initial bump in heap usage. After that, you will see it slowly climb and never back down. Note - the metrics calls are not required, and do not cause the memory leak - you can watch system memory without this library and see the same issue. I've tried a few versions of Akka, with the same results. In production, a similar (real world) application eventually consumes all memory. Hoping for a little insight on the Akka internals, and potential solution. J -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Debugging Akka Cluster
Having the debugging profile only suspend the current thread helped a lot, thanks! On Thursday, May 7, 2015 at 1:23:41 AM UTC-7, 何品 wrote: you could just pause the current thread.not all of them.cursive debugger could do that. 在 2015年5月7日星期四 UTC+8上午10:50:41,Jeff写道: Every time I set a breakpoint in an actor involved with Actor Clustering, the phi detector triggers and marks that node as Unreachable. Is there a way to prevent this during debugging? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Debugging Akka Cluster
Every time I set a breakpoint in an actor involved with Actor Clustering, the phi detector triggers and marks that node as Unreachable. Is there a way to prevent this during debugging? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] withAttributes scope
What is the scope of .withAttributes()? Does it only apply to the stage that it is called on, or every stage up to that point? For Example Sink.empty.transform(() = ...).transform(() = ...).withAttributes(ActorOperationAttributes.dispatcher(test-dispatcher)) Are all the transformation stages run under test-dispatcher, or just the last one? If I only want to apply the attributes to a single stage, should I use .via()? Thanks Jeff -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Cluster router significantly outperforms remote router
I have updated the ticket with the config I am using. On Friday, April 10, 2015 at 2:52:47 AM UTC-7, Akka Team wrote: I added a ticket: https://github.com/akka/akka/issues/17171 On Fri, Apr 10, 2015 at 10:23 AM, Endre Varga endre...@typesafe.com javascript: wrote: Wow! This is an interesting finding, we need to investigate. -Endre On Fri, Apr 10, 2015 at 3:38 AM, Jeff jknigh...@gmail.com javascript: wrote: I have been able to isolate the code that is running differently between the two different actor providers. Below is the receive of the offending actor: var now: Long = _ def receive = { case UserMessage.LookupOrgsResponse(_, uo) = now = new Date().getTime db ! OrgMessage.BatchLookupByIds(uo.map( _.orgId )) db ! PersonMessage.BatchLookupById(uo.map( _.personId )) case OrgMessage.BatchLookupResponse(os) = println(sorgs took ${new Date().getTime - now} ms) orgs = Some(os); process case PersonMessage.BatchLookupByIdResponse(ps) = println(spersons took ${new Date().getTime - now} ms) persons = Some(ps); process } When running with provider = akka.remote.RemoteActorRefProvider, I get the following times: persons took 15 ms orgs took 57 ms orgs took 13 ms persons took 55 ms orgs took 15 ms persons took 57 ms orgs took 19 ms persons took 61 ms orgs took 12 ms persons took 52 ms persons took 27 ms orgs took 68 ms orgs took 14 ms persons took 55 ms And when I run it with provider = akka.cluster.ClusterActorRefProvider I get: orgs took 12 ms persons took 14 ms orgs took 15 ms persons took 20 ms persons took 13 ms orgs took 15 ms orgs took 12 ms persons took 16 ms orgs took 10 ms persons took 12 ms Again, nothing in the business logic is changing. I am simply changing the provider in the application.conf and making the routers cluster aware. These results are consistent on a warmed jvm. On Thursday, April 9, 2015 at 12:10:38 AM UTC-7, drewhk wrote: Hi Jeff, On Thu, Apr 9, 2015 at 7:59 AM, Jeff jknigh...@gmail.com wrote: I am building an actor system that basically has a topology of spray - business logic actor - slick. I initially built these using akka remote, but have recently switch over to akka cluster. In testing these systems, I have noticed that round trip calls through the system take half the time using akka cluster as they did using akka remote. Literally, the only thing changed between benchmarks is the actor provider. Why would akka cluster provider perform so much better on identical workloads? I was under the impression that akka cluster built on top of akka remote. This is indeed strange. ClusterActorRefProvider actually extends RemoteActorRefProvider: https://github.com/akka/akka/blob/ 8485cd2ebb46d2fba851c41c03e34436e498c005/akka-cluster/src/ main/scala/akka/cluster/ClusterActorRefProvider.scala#L26 Do you have a small example that demonstrates this? It would be interesting to investigate what is the source of difference. Maybe deathwatch, I don't know. -Endre -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team
[akka-user] Re: [akka-stream] Props Source ignoring dispatcher
This is using 1.0-M5 On Monday, April 13, 2015 at 9:32:38 PM UTC-7, Jeff wrote: I am creating an ActorPublisher to encapsulate a kafka consumer. I am trying to bulkhead the actor behind a custom dispatcher (since the kafka consumer is blocking) with the following code: val in = Source[Array[Byte]](KafkaConsumerActor.props(consumer.createMessageStreamsByTopic(topic).head).withDispatcher(kafka-consumer-dispatcher)) However, when I set a breakpoint in the receive method and inspect the context, the dispatcher is set to default-dispatcher https://lh3.googleusercontent.com/-GSsCrdUDXtc/VSyYFsKunFI/BPM/OQ8Q9tdqGbA/s1600/Screen%2BShot%2B2015-04-13%2Bat%2B9.28.27%2BPM.png -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [akka-stream] Props Source ignoring dispatcher
I am creating an ActorPublisher to encapsulate a kafka consumer. I am trying to bulkhead the actor behind a custom dispatcher (since the kafka consumer is blocking) with the following code: val in = Source[Array[Byte]](KafkaConsumerActor.props(consumer.createMessageStreamsByTopic(topic).head).withDispatcher(kafka-consumer-dispatcher)) However, when I set a breakpoint in the receive method and inspect the context, the dispatcher is set to default-dispatcher https://lh3.googleusercontent.com/-GSsCrdUDXtc/VSyYFsKunFI/BPM/OQ8Q9tdqGbA/s1600/Screen%2BShot%2B2015-04-13%2Bat%2B9.28.27%2BPM.png -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-persistence vs durable mailbox
The main difference I see with the change from a DurableMailbox to using a PersistentChannel/AtLeastOnceDelivery is the former seems pull based while the latter is push. Is this the case? I'm curious if there's a way I can essentially implement a durable mailbox based on the Persistence framework? Basically I want to pile a bunch of messages into this mailbox and have the actor process them when it can with a producer timeout restriction. I know this is related to the Reactive Stream stuff, but I'm interested in something near term and deciding if I should implement a DurableMailbox or spend more time trying to bend Persistence to work with my stateless actor. Thanks -Jeff On Monday, March 24, 2014 8:53:23 AM UTC-6, Patrik Nordwall wrote: The big difference is that when using akka-persistence the messages are not stored immediately when placed in the mailbox. When you receive the Persistent message in the Processor you know that it has been stored. The sender can let go of the responsibility when it has received an explicit acknowledgement message from the Processor. A safe hand-off like that can also be implemented with a PersistentChannel. This is rather close to a durable mailbox, but so much better. class Endpoint extends Actor { val channel = context.actorOf(PersistentChannel.props( PersistentChannelSettings(redeliverInterval = 3.seconds, redeliverMax = 10, replyPersistent = true)), name = myChannel) val destination = context.system / jobManager import context.dispatcher implicit val timeout = Timeout(5.seconds) def receive = { case job: Job = (channel ? Deliver(Persistent(job), destination)) map { case _: Persistent = OK: + job.id } recover { case e = FAILED: + job.id } pipeTo sender() } } Cheers, Patrik On Mon, Mar 24, 2014 at 3:22 PM, Akka Team akka.o...@gmail.com javascript: wrote: Hi Logan, I took a look into the akka-persistence documentation and from what I can gather the goal of the project is to store processed messages so that an actor's state can be rebuilt upon an actor's restart, while I'd like to store unprocessed messages so they can be processed upon an actor's restart (for my purposes, my actor is essentially stateless). Processors ( http://doc.akka.io/docs/akka/2.3.0/scala/persistence.html#Processors) store the message *first* and then process them. If you do not want to keep the whole history of the Processor in the journal you can delete messages: http://doc.akka.io/docs/akka/2.3.0/scala/persistence.html#Message_deletion -Endre Am I understanding this incorrectly? Is there a way to accomplish my goal with the persistence module, or is there an alternative approach? Thanks! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - The software stack for applications that scale Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Daemonic Akka creating non-daemon threads
Thx for the feedback! Re ouliving... Afaict, the app just stops getting connections, so its hard to tell there is a problem from the inside. (Fwiw, this app also connects to other systems which might be down; hence the app needs to deal with failure and recover those connections -- simply exiting the app is not an option in those scenarios.) As for periodically... good question.. one thing i'm asking for is the rate of leak. It is not on the order of minutes, more like days, and may actually be weeks (I know this sounds odd, but i dont have direct access to the production systems and need to go through a couple layers of IT folks in different geos and timezones.) Anyhow, I just wanted to check on the options/alternatives that might not be obvious. I'm a fan of getting on the latest stable (but not bleeding edge). -jeff On Wednesday, January 15, 2014 2:25:09 AM UTC-8, Björn Antonsson wrote: Hi Jeff, On Tuesday, 14 January 2014 at 03:54, Jeff Crilly wrote: We've had some recent production issues and it looks like this is the exact issue biting us; (this results in a need to restart our appservers rather periodically.) The app is on akka 2.1.4. I’m just curious as to what you mean by periodically. Also, why does you app server outlive the actor systems created inside it? You shouldn’t need to start/stop actor systems frequently. I scanned the akka 2.1.x to 2.2.x migration guide a while ago (and also just now), and it didn't seem like a drop in library change. (Am I missing something?) You are right that there are a number of changes between 2.1 and 2.2 that might make upgrading (depending on you application) include some rewriting of you application. The easiest thing is probably to try to build against 2.2.x. Any other thoughts on how we might mitigate the issue alternatively w/o 2.2 upgrade? (Thought i'd ask anyhow.) (Or do we.. ahem.. need to bite the bullet and do the upgrade? This is an application that is undergoing minor bug fixes, but not major changes, hence rolling a full regression etc is not quite in the plan; we were looking for a narrow fix/solution rather than (afaict) refactoring the akka usage.) I think that upgrading to 2.2 is the only available solution for the thread leakage right now, unless you keep the same actor system around in the app server. B/ thx! On Sunday, June 23, 2013 9:55:35 AM UTC-7, √ wrote: I just want to clarify that there is no ship date for a 2.1.5 release at this date. Cheers, √ On Sun, Jun 23, 2013 at 12:44 PM, Matei Zaharia matei@gmail.comwrote: Awesome, thanks! Matei On Jun 23, 2013, at 12:39 PM, √iktor Ҡlang viktor...@gmail.com wrote: https://github.com/akka/akka/pull/1559/files On Sat, Jun 22, 2013 at 7:16 PM, Matei Zaharia matei@gmail.comwrote: Yes, it was in 2.1.4, and it looks like it's been fixed in https://www.assembla.com/spaces/akka/tickets/3436. I'm curious, will the fix be merged into 2.1.5, or should I upgrade to 2.2? Matei On Wednesday, June 12, 2013 8:07:53 AM UTC-4, Patrik Nordwall wrote: Thanks for reporting. We will look into it. Is it akka 2.1.4? Otherwise, please try with that version. /Patrik On Sun, Jun 9, 2013 at 1:07 AM, Matei Zaharia matei@gmail.com wrote: Hi, I've noticed that in Akka 2.1.x, setting the daemonic flag to true no longer ensures that all of Akka's threads are daemons. Instead, if you register any remote actors, there are some leftover threads like this: New I/O server boss #18 prio=10 tid=0x7f83949eb000 nid=0x5fb runnable [0x7f836d0a8000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:81) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked 0xfeae8170 (a sun.nio.ch.Util$2) - locked 0xfeae8180 (a java.util.Collections$ UnmodifiableSet) - locked 0xfeae8128 (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:102) at org.jboss.netty.channel.socket.nio.NioServerBoss. select(NioServerBoss.java:163) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run( AbstractNioSelector.java:206) at org.jboss.netty.channel.socket.nio.NioServerBoss.run( NioServerBoss.java:42) at org.jboss.netty.util.ThreadRenamingRunnable.run( ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run( DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) New I/O worker #17 prio=10 tid
Re: [akka-user] Daemonic Akka creating non-daemon threads
We've had some recent production issues and it looks like this is the exact issue biting us; (this results in a need to restart our appservers rather periodically.) The app is on akka 2.1.4. I scanned the akka 2.1.x to 2.2.x migration guide a while ago (and also just now), and it didn't seem like a drop in library change. (Am I missing something?) Any other thoughts on how we might mitigate the issue alternatively w/o 2.2 upgrade? (Thought i'd ask anyhow.) (Or do we.. ahem.. need to bite the bullet and do the upgrade? This is an application that is undergoing minor bug fixes, but not major changes, hence rolling a full regression etc is not quite in the plan; we were looking for a narrow fix/solution rather than (afaict) refactoring the akka usage.) thx! On Sunday, June 23, 2013 9:55:35 AM UTC-7, √ wrote: I just want to clarify that there is no ship date for a 2.1.5 release at this date. Cheers, √ On Sun, Jun 23, 2013 at 12:44 PM, Matei Zaharia matei@gmail.comjavascript: wrote: Awesome, thanks! Matei On Jun 23, 2013, at 12:39 PM, √iktor Ҡlang viktor...@gmail.comjavascript: wrote: https://github.com/akka/akka/pull/1559/files On Sat, Jun 22, 2013 at 7:16 PM, Matei Zaharia matei@gmail.comjavascript: wrote: Yes, it was in 2.1.4, and it looks like it's been fixed in https://www.assembla.com/spaces/akka/tickets/3436. I'm curious, will the fix be merged into 2.1.5, or should I upgrade to 2.2? Matei On Wednesday, June 12, 2013 8:07:53 AM UTC-4, Patrik Nordwall wrote: Thanks for reporting. We will look into it. Is it akka 2.1.4? Otherwise, please try with that version. /Patrik On Sun, Jun 9, 2013 at 1:07 AM, Matei Zaharia matei@gmail.comwrote: Hi, I've noticed that in Akka 2.1.x, setting the daemonic flag to true no longer ensures that all of Akka's threads are daemons. Instead, if you register any remote actors, there are some leftover threads like this: New I/O server boss #18 prio=10 tid=0x7f83949eb000 nid=0x5fb runnable [0x7f836d0a8000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl. java:81) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked 0xfeae8170 (a sun.nio.ch.Util$2) - locked 0xfeae8180 (a java.util.Collections$ UnmodifiableSet) - locked 0xfeae8128 (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:102) at org.jboss.netty.channel.socket.nio.NioServerBoss. select(NioServerBoss.java:163) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run( AbstractNioSelector.java:206) at org.jboss.netty.channel.socket.nio.NioServerBoss.run( NioServerBoss.java:42) at org.jboss.netty.util.ThreadRenamingRunnable.run( ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run( DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) New I/O worker #17 prio=10 tid=0x7f83949d9000 nid=0x5fa runnable [0x7f836d1a9000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl. java:81) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked 0xfeb200e8 (a sun.nio.ch.Util$2) - locked 0xfeb200f8 (a java.util.Collections$ UnmodifiableSet) - locked 0xfeb200a0 (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.jboss.netty.channel.socket.nio.SelectorUtil. select(SelectorUtil.java:64) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select( AbstractNioSelector.java:409) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run( AbstractNioSelector.java:206) at org.jboss.netty.channel.socket.nio.AbstractNioWorker. run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run( NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run( ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run( DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Calling ActorSystem.shutdown() does shut these down but, I'd