Re: [akka-user] import context.dispatcher nullpointerexception

2018-03-08 Thread Jeff
I agree with Heiko on this one - import context.dispatcher is used 
everywhere. It is mentioned in the docs not to close over 'context' and 
access it outside of message handlers, however I think more education is 
needed here. 

On Wednesday, March 7, 2018 at 5:35:05 PM UTC-8, Konrad Malawski wrote:
>
> Yes it is nulled using unsafe.
>
> -- 
> Cheers,
> Konrad 'ktoso <http://kto.so>' Malawski
> Akka <http://akka.io/> @ Lightbend <http://lightbend.com/>
>
> On March 8, 2018 at 7:32:56, Heiko Seeberger (loe...@posteo.de 
> ) wrote:
>
> `import context.dispatcher` is what „everybody“ is doing, not? I’m using 
> it all over the place, because I have learned that one can import from 
> stable identifiers (e.g. vals) in Scala. Hence I don’t think changing the 
> docs has the necessary effect.
>
> How can a final val be „nulled"? Unsafe? Reflection?
>
> Cheers
> Heiko
>
> --
>
> Heiko Seeberger
> Home: heikoseeberger.de
> Twitter: @hseeberger
> Public key: keybase.io/hseeberger
>
>
>
>
> Am 07.03.2018 um 19:23 schrieb Patrik Nordwall <patrik@gmail.com 
> >:
>
> Thanks!
>
> On Wed, Mar 7, 2018 at 6:16 PM, Jeff <jknigh...@gmail.com > 
> wrote:
>
>> An example of documentation for using import context.dispatcher is here 
>> https://doc.akka.io/docs/akka/2.5/futures.html#within-actors 
>>
>> I can create some PR to update the documentation
>>
>> On Wednesday, March 7, 2018 at 6:09:24 AM UTC-8, Patrik Nordwall wrote: 
>>>
>>> It's because when the actor is stopped some of the fields are cleared 
>>> (yeah, even though they are vals) to "help" the GC in case something (e.g. 
>>> an local ActorRef) is still referencing the actor instance. 
>>>
>>> implicit val ec = context.dispatcher
>>>
>>> would solve it here.
>>>
>>> Where in the documentation is the import recommended? We should probably 
>>> update that. Would you be able creating a PR fixing it? Thanks.
>>>
>>> /Patrik
>>>
>>> On Tue, Mar 6, 2018 at 10:44 PM, Jeff <jknigh...@gmail.com> wrote:
>>>
>>>> I suspected as much. So what would you suggest as for handling use 
>>>> cases where you could have chained flatmaps on futures that at the end 
>>>> will 
>>>> pipeTo a message back to the actor. Should we set the ExecutionContext to 
>>>> a 
>>>> val? 
>>>>
>>>> On Tuesday, March 6, 2018 at 12:09:22 PM UTC-8, √ wrote: 
>>>>>
>>>>> Context is bound to the lifecycle of the actor.
>>>>>
>>>>> On Tue, Mar 6, 2018 at 8:37 PM, Jeff <jknigh...@gmail.com> wrote:
>>>>>
>>>>>> I have noticed an issue where if a future maps/flatmaps after actor 
>>>>>> shutdown, a NullPointerException is thrown. I've narrowed it down to 
>>>>>> the import context.dispatcher, which I technically understand since 
>>>>>> that is closing over actor state. If I change that to implicit val 
>>>>>> ec = context.dispatcher, everything works fine.  
>>>>>>
>>>>>> I'd like to understand what is the best practice in this case, since 
>>>>>> the documentation for context.dispatcher indicates that it is 
>>>>>> threadsafe and looking at the actor trait, context is a val. Most 
>>>>>> documentation seems to indicate that import context.dispatcher is 
>>>>>> preferred. 
>>>>>>
>>>>>> Thanks
>>>>>> Jeff
>>>>>>
>>>>>> --
>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>> >>>>>>>>>> Check the FAQ: 
>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>> >>>>>>>>>> Search the archives: 
>>>>>> https://groups.google.com/group/akka-user
>>>>>> ---
>>>>>> You received this message because you are subscribed to the Google 
>>>>>> Groups "Akka User List" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it, 
>>>>>> send an email to akka-user+...@googlegroups.com.
>>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>>>> For more options, 

Re: [akka-user] import context.dispatcher nullpointerexception

2018-03-07 Thread Jeff
An example of documentation for using import context.dispatcher is 
here https://doc.akka.io/docs/akka/2.5/futures.html#within-actors

I can create some PR to update the documentation

On Wednesday, March 7, 2018 at 6:09:24 AM UTC-8, Patrik Nordwall wrote:
>
> It's because when the actor is stopped some of the fields are cleared 
> (yeah, even though they are vals) to "help" the GC in case something (e.g. 
> an local ActorRef) is still referencing the actor instance.
>
> implicit val ec = context.dispatcher
>
> would solve it here.
>
> Where in the documentation is the import recommended? We should probably 
> update that. Would you be able creating a PR fixing it? Thanks.
>
> /Patrik
>
> On Tue, Mar 6, 2018 at 10:44 PM, Jeff <jknigh...@gmail.com > 
> wrote:
>
>> I suspected as much. So what would you suggest as for handling use cases 
>> where you could have chained flatmaps on futures that at the end will 
>> pipeTo a message back to the actor. Should we set the ExecutionContext to a 
>> val? 
>>
>> On Tuesday, March 6, 2018 at 12:09:22 PM UTC-8, √ wrote:
>>>
>>> Context is bound to the lifecycle of the actor.
>>>
>>> On Tue, Mar 6, 2018 at 8:37 PM, Jeff <jknigh...@gmail.com> wrote:
>>>
>>>> I have noticed an issue where if a future maps/flatmaps after actor 
>>>> shutdown, a NullPointerException is thrown. I've narrowed it down to 
>>>> the import context.dispatcher, which I technically understand since 
>>>> that is closing over actor state. If I change that to implicit val ec 
>>>> = context.dispatcher, everything works fine. 
>>>>
>>>> I'd like to understand what is the best practice in this case, since 
>>>> the documentation for context.dispatcher indicates that it is 
>>>> threadsafe and looking at the actor trait, context is a val. Most 
>>>> documentation seems to indicate that import context.dispatcher is 
>>>> preferred. 
>>>>
>>>> Thanks
>>>> Jeff
>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> √
>>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Akka Tech Lead
> Lightbend <http://www.lightbend.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] import context.dispatcher nullpointerexception

2018-03-06 Thread Jeff
I suspected as much. So what would you suggest as for handling use cases 
where you could have chained flatmaps on futures that at the end will 
pipeTo a message back to the actor. Should we set the ExecutionContext to a 
val? 

On Tuesday, March 6, 2018 at 12:09:22 PM UTC-8, √ wrote:
>
> Context is bound to the lifecycle of the actor.
>
> On Tue, Mar 6, 2018 at 8:37 PM, Jeff <jknigh...@gmail.com > 
> wrote:
>
>> I have noticed an issue where if a future maps/flatmaps after actor 
>> shutdown, a NullPointerException is thrown. I've narrowed it down to the 
>> import 
>> context.dispatcher, which I technically understand since that is closing 
>> over actor state. If I change that to implicit val ec = 
>> context.dispatcher, everything works fine. 
>>
>> I'd like to understand what is the best practice in this case, since the 
>> documentation for context.dispatcher indicates that it is threadsafe and 
>> looking at the actor trait, context is a val. Most documentation seems 
>> to indicate that import context.dispatcher is preferred. 
>>
>> Thanks
>> Jeff
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] import context.dispatcher nullpointerexception

2018-03-06 Thread Jeff
I have noticed an issue where if a future maps/flatmaps after actor 
shutdown, a NullPointerException is thrown. I've narrowed it down to the import 
context.dispatcher, which I technically understand since that is closing 
over actor state. If I change that to implicit val ec = context.dispatcher, 
everything works fine. 

I'd like to understand what is the best practice in this case, since the 
documentation for context.dispatcher indicates that it is threadsafe and 
looking at the actor trait, context is a val. Most documentation seems to 
indicate that import context.dispatcher is preferred. 

Thanks
Jeff

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Flow.mapAsync and downstream demand

2018-02-23 Thread Jeff
Actually found a much simpler solution using Source.unfoldAsync. 

On Thursday, February 22, 2018 at 6:54:54 PM UTC-8, Jeff wrote:
>
> My particular use case is a long poll where the id to be sent on the next 
> request is returned from the previous request.
>
> On Thursday, February 22, 2018 at 6:40:48 PM UTC-8, Konrad Malawski wrote:
>>
>> In general relying on exact timing of pulls is rather seen as an anti 
>> pattern…
>> Even if you do a custom stage, it’s very brittle, as any element put 
>> before your stage could cause pulls, simply because that’s how things work.
>> The same can be said about simply changed buffer sizes between stages — 
>> introducing an async boundary anywhere in the pipeline can lead 
>> to more buffer space being allocated, and thus, more pulls being issued.
>>
>> In other words — due to the fact that backpressure is connected through 
>> an entire pipeline (and has to, otherwise it would not do what it should),
>> you can not rely on exact timing of pulls, because changes in the 
>> pipeline may affect this.
>>
>> In your very specific case perhaps it’s doable to make a stage or 
>> pipeline that’ll do the right thing, with enforcing it with zips and loops,
>> but that’ll be specific and take some work and very precise wording of 
>> what you want to achieve (sample code?)
>>
>> -- 
>> Cheers,
>> Konrad 'ktoso <http://kto.so>' Malawski
>> Akka <http://akka.io/> @ Lightbend <http://lightbend.com/>
>>
>> On February 23, 2018 at 7:05:18, Jeff (jknigh...@gmail.com) wrote:
>>
>> Currently, Flow.mapAsync and Flow.mapAsyncUnordered always pull upstream 
>> even if there is no demand from downstream and then buffer. However, there 
>> are situations where one might want to way for explicit downstream demand 
>> before pulling. For example, let's say that the next upstream item depends 
>> on the results of the previous item and that you need to feed that back 
>> into the upstream before pulling again. In its current form, Flow.mapAsync 
>> will pull before that feedback loop has been completed.  
>>
>> Is there any way to work around that other than writing a custom 
>> GraphStage? 
>>
>> Thanks
>> Jeff
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Flow.mapAsync and downstream demand

2018-02-22 Thread Jeff
My particular use case is a long poll where the id to be sent on the next 
request is returned from the previous request.

On Thursday, February 22, 2018 at 6:40:48 PM UTC-8, Konrad Malawski wrote:
>
> In general relying on exact timing of pulls is rather seen as an anti 
> pattern…
> Even if you do a custom stage, it’s very brittle, as any element put 
> before your stage could cause pulls, simply because that’s how things work.
> The same can be said about simply changed buffer sizes between stages — 
> introducing an async boundary anywhere in the pipeline can lead 
> to more buffer space being allocated, and thus, more pulls being issued.
>
> In other words — due to the fact that backpressure is connected through an 
> entire pipeline (and has to, otherwise it would not do what it should),
> you can not rely on exact timing of pulls, because changes in the pipeline 
> may affect this.
>
> In your very specific case perhaps it’s doable to make a stage or pipeline 
> that’ll do the right thing, with enforcing it with zips and loops,
> but that’ll be specific and take some work and very precise wording of 
> what you want to achieve (sample code?)
>
> -- 
> Cheers,
> Konrad 'ktoso <http://kto.so>' Malawski
> Akka <http://akka.io/> @ Lightbend <http://lightbend.com/>
>
> On February 23, 2018 at 7:05:18, Jeff (jknigh...@gmail.com ) 
> wrote:
>
> Currently, Flow.mapAsync and Flow.mapAsyncUnordered always pull upstream 
> even if there is no demand from downstream and then buffer. However, there 
> are situations where one might want to way for explicit downstream demand 
> before pulling. For example, let's say that the next upstream item depends 
> on the results of the previous item and that you need to feed that back 
> into the upstream before pulling again. In its current form, Flow.mapAsync 
> will pull before that feedback loop has been completed.  
>
> Is there any way to work around that other than writing a custom 
> GraphStage? 
>
> Thanks
> Jeff
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Flow.mapAsync and downstream demand

2018-02-22 Thread Jeff
Currently, Flow.mapAsync and Flow.mapAsyncUnordered always pull upstream 
even if there is no demand from downstream and then buffer. However, there 
are situations where one might want to way for explicit downstream demand 
before pulling. For example, let's say that the next upstream item depends 
on the results of the previous item and that you need to feed that back 
into the upstream before pulling again. In its current form, Flow.mapAsync 
will pull before that feedback loop has been completed. 

Is there any way to work around that other than writing a custom 
GraphStage? 

Thanks
Jeff

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Single Writer/Multiple Reader and anonymous actors

2018-01-03 Thread Jeff
Bumping this - what are the best practices about using anonymous actors to 
modify shared state?

Thanks
Jeff

On Wednesday, December 27, 2017 at 4:51:04 PM UTC-8, Jeff wrote:
>
> I have a class that is read heavy with low contention on writes. I'd like 
> to be able to access a piece of shared mutable state outside of Actors. The 
> state itself (AnnouncementRegistry) is immutable. Since it is a single 
> writer, having a shared @volatile var between the actor and the class seems 
> to be a good approach, however it also has a code smell to it since I am 
> closing over external mutable state and using an anonymous actor. Is there 
> a better approach to what I have below?
>
> class DiscoveryClient(config: DiscoveryConfig, singleRequest: HttpRequest => 
> Future[HttpResponse])
>  (implicit system: ActorSystem, m: Materializer) {
>
>   @volatile
>   private[this] var _registry = AnnouncementRegistry.empty
>
>   def registry = _registry
>
>   private[this] val client = system.actorOf(Props(new Actor with ActorLogging 
> {
> import DiscoveryClient._
>
> private[this] var subscribers = HashSet.empty[ActorRef]
>
> private[this] val killswitch =
>   ChangeWatcher(config.seeds, singleRequest).to(Sink.foreach(self ! 
> RegistryUpdate(_))).run()
>
> def receive: Receive = uninitialized
>
> private[this] def uninitialized: Receive = {
>   case Subscribe =>
> subscribers += sender
> context.watch(sender)
>
>   case Terminated(ref) =>
> subscribers -= ref
>
>   case m @ RegistryUpdate(newRegistry) =>
> _registry = newRegistry
> context.become(initialized)
> subscribers.foreach(_ ! m)
> }
>
> private[this] def initialized: Receive = {
>   case Subscribe =>
> subscribers += sender
> context.watch(sender)
> sender ! RegistryUpdate(_registry)
>
>   case Terminated(ref) =>
> subscribers -= ref
>
>   case m @ RegistryUpdate(newRegistry) =>
> _registry = newRegistry
> subscribers.foreach(_ ! m)
> }
>
> override def postStop(): Unit = killswitch.shutdown()
>   }), "discovery-client")
> }
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Single Writer/Multiple Reader and anonymous actors

2017-12-27 Thread Jeff
I have a class that is read heavy with low contention on writes. I'd like 
to be able to access a piece of shared mutable state outside of Actors. The 
state itself (AnnouncementRegistry) is immutable. Since it is a single 
writer, having a shared @volatile var between the actor and the class seems 
to be a good approach, however it also has a code smell to it since I am 
closing over external mutable state and using an anonymous actor. Is there 
a better approach to what I have below?

class DiscoveryClient(config: DiscoveryConfig, singleRequest: HttpRequest => 
Future[HttpResponse])
 (implicit system: ActorSystem, m: Materializer) {

  @volatile
  private[this] var _registry = AnnouncementRegistry.empty

  def registry = _registry

  private[this] val client = system.actorOf(Props(new Actor with ActorLogging {
import DiscoveryClient._

private[this] var subscribers = HashSet.empty[ActorRef]

private[this] val killswitch =
  ChangeWatcher(config.seeds, singleRequest).to(Sink.foreach(self ! 
RegistryUpdate(_))).run()

def receive: Receive = uninitialized

private[this] def uninitialized: Receive = {
  case Subscribe =>
subscribers += sender
context.watch(sender)

  case Terminated(ref) =>
subscribers -= ref

  case m @ RegistryUpdate(newRegistry) =>
_registry = newRegistry
context.become(initialized)
subscribers.foreach(_ ! m)
}

private[this] def initialized: Receive = {
  case Subscribe =>
subscribers += sender
context.watch(sender)
sender ! RegistryUpdate(_registry)

  case Terminated(ref) =>
subscribers -= ref

  case m @ RegistryUpdate(newRegistry) =>
_registry = newRegistry
subscribers.foreach(_ ! m)
}

override def postStop(): Unit = killswitch.shutdown()
  }), "discovery-client")
}

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: RestartSource/Flow/Sink practical examples

2017-10-13 Thread Jeff
Still trying to figure out how I can use the new RestartSink/Flow/Source to 
retry an Http request in a Stream. Any thoughts?

On Thursday, October 5, 2017 at 12:29:07 PM UTC-7, Jeff wrote:
>
> I am trying to create a polling flow which feeds the results of the last 
> update into the next update (much like the SSE example here 
> https://youtu.be/-7OyuTMgI1I). However, I'd like to add an exponential 
> backoff on failed requests. The new RestartSource/Flow/Sink seems like the 
> correct fit, however I can't seem to get it to work in practice. The 
> strawman example I have is 
>
> val flow = Flow[Long].mapAsyncUnordered(1) { lastIndex =>
>
>   val request = RestartFlow.withBackoff(1.second, 10.seconds, .20)(() => {
> Flow[Long].mapAsyncUnordered(1) { lastIndex =>
>   val p = Promise[WatchResult]
>   pool ! HAConnectionPool.Watch(lastIndex, p)
>   p.future
> }
>   })
>
>   Source.repeat(lastIndex).via(request).runWith(Sink.head)
> }
>
>
> The problem with this however is that mapAsyncUnordered() will always 
> pull the upstream when the future completes, even if there is no downstream 
> demand, causing 2 requests to be made instead of one. I thought about using 
> RestartSource, but it appears that it will keep resetting the 
> Source.single on completeStage(), which is undesirable. 
>
> Thoughts?
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: RestartSource/Flow/Sink practical examples

2017-10-09 Thread Jeff
bump

On Thursday, October 5, 2017 at 12:29:07 PM UTC-7, Jeff wrote:
>
> I am trying to create a polling flow which feeds the results of the last 
> update into the next update (much like the SSE example here 
> https://youtu.be/-7OyuTMgI1I). However, I'd like to add an exponential 
> backoff on failed requests. The new RestartSource/Flow/Sink seems like the 
> correct fit, however I can't seem to get it to work in practice. The 
> strawman example I have is 
>
> val flow = Flow[Long].mapAsyncUnordered(1) { lastIndex =>
>
>   val request = RestartFlow.withBackoff(1.second, 10.seconds, .20)(() => {
> Flow[Long].mapAsyncUnordered(1) { lastIndex =>
>   val p = Promise[WatchResult]
>   pool ! HAConnectionPool.Watch(lastIndex, p)
>   p.future
> }
>   })
>
>   Source.repeat(lastIndex).via(request).runWith(Sink.head)
> }
>
>
> The problem with this however is that mapAsyncUnordered() will always 
> pull the upstream when the future completes, even if there is no downstream 
> demand, causing 2 requests to be made instead of one. I thought about using 
> RestartSource, but it appears that it will keep resetting the 
> Source.single on completeStage(), which is undesirable. 
>
> Thoughts?
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] RestartSource/Flow/Sink practical examples

2017-10-05 Thread Jeff
I am trying to create a polling flow which feeds the results of the last 
update into the next update (much like the SSE example 
here https://youtu.be/-7OyuTMgI1I). However, I'd like to add an exponential 
backoff on failed requests. The new RestartSource/Flow/Sink seems like the 
correct fit, however I can't seem to get it to work in practice. The 
strawman example I have is 

val flow = Flow[Long].mapAsyncUnordered(1) { lastIndex =>

  val request = RestartFlow.withBackoff(1.second, 10.seconds, .20)(() => {
Flow[Long].mapAsyncUnordered(1) { lastIndex =>
  val p = Promise[WatchResult]
  pool ! HAConnectionPool.Watch(lastIndex, p)
  p.future
}
  })

  Source.repeat(lastIndex).via(request).runWith(Sink.head)
}


The problem with this however is that mapAsyncUnordered() will always pull 
the upstream when the future completes, even if there is no downstream 
demand, causing 2 requests to be made instead of one. I thought about using 
RestartSource, but it appears that it will keep resetting the Source.single 
on completeStage(), which is undesirable. 

Thoughts?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Jeff
A Consul like service discovery client built on top of akka-http

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Jeff
That's what I was leaning towards as well, since materialzers are fairly 
lightweight. However, I've seen that statement prefaced with, "...right now" so 
I wanted to make sure there wasn't some other way to share. 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Jeff
How, exactly. You only have access to an ExtendedActorSystem inside of 
createExtension()

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] Shared Materializer in akka extensions

2017-09-29 Thread Jeff
I am building an akka extension, but there doesn't seem to be an elegant 
way to pass in a shared Materializer. What would be the suggested way to 
use a shared materializer between akka extensions and the main application?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-http] Http().superPool() and MergeHub.source backpressure

2017-08-15 Thread Jeff
I'm embarrassed I forgot something so obvious. Much thanks!

On Tuesday, August 15, 2017 at 8:50:27 AM UTC-7, johannes...@lightbend.com 
wrote:
>
> Hi Jeff,
>
> if you don't read the response bodies of all the responses, your pipeline 
> will stall because the super pool connection are still waiting for your 
> code to actually read the responses. In your example, try to add 
> `x.discardEntityBytes` (or actually read the entity) inside of the 
> `Sink.foreach` block.
>
> See 
> http://doc.akka.io/docs/akka-http/current/scala/http/implications-of-streaming-http-entity.html
>  
> for more information on that topic.
>
> Johannes
>
> On Thursday, August 10, 2017 at 1:53:18 AM UTC+2, Jeff wrote:
>>
>> I am getting behavior that I do not  understand combining 
>> Http().superPool() with MergeHub.source. Here is a sample application
>>
>> import akka.actor.ActorSystem
>> import akka.http.scaladsl.Http
>> import akka.http.scaladsl.model.{HttpRequest, Uri}
>> import akka.stream.{ActorMaterializer, ThrottleMode}
>> import akka.stream.scaladsl.{MergeHub, Sink, Source}
>> import com.typesafe.config.ConfigFactory
>>
>> import scala.concurrent.duration._
>>
>> object Main extends App {
>>   val config = ConfigFactory.load()
>>
>>   implicit val system = ActorSystem("test-system", config)
>>   implicit val executionContext = system.dispatcher
>>   implicit val materializer = ActorMaterializer()
>>
>>   val request = HttpRequest(uri = Uri("http://www.google.com;))
>>
>>   val sink = MergeHub.source[(HttpRequest, Int)]
>> .throttle(1, 1.second, 1, ThrottleMode.shaping)
>> .map { x =>
>>   println(s"started ${x._2}")
>>   x
>> }
>> .via(Http().superPool())
>> .to(Sink.foreach { x =>
>>   println(s"finished ${x._2}")
>> }).run()
>>
>>   for (x <- 1 to 100) sink.runWith(Source.single(request -> x))
>> }
>>
>>
>> The output of which is 
>>
>> started 2
>> finished 2
>> started 3
>> finished 3
>> started 4
>> finished 4
>> started 1
>> finished 1
>> started 5
>> started 6
>> started 7
>> started 8
>>
>> My understanding from the documentation is that Http().superPool() will 
>> backpressure. However, after 8 iterations, nothing else happens. Thoughts?
>>
>> Thanks
>> Jeff
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-http] Http().superPool() and MergeHub.source backpressure

2017-08-14 Thread Jeff
Quickly following up on this. My understanding of how MergeHub works leads 
me to believe that all 100 requests in my example should fire, however only 
8 of them do, and only 4 complete. 

On Wednesday, August 9, 2017 at 4:53:18 PM UTC-7, Jeff wrote:
>
> I am getting behavior that I do not  understand combining 
> Http().superPool() with MergeHub.source. Here is a sample application
>
> import akka.actor.ActorSystem
> import akka.http.scaladsl.Http
> import akka.http.scaladsl.model.{HttpRequest, Uri}
> import akka.stream.{ActorMaterializer, ThrottleMode}
> import akka.stream.scaladsl.{MergeHub, Sink, Source}
> import com.typesafe.config.ConfigFactory
>
> import scala.concurrent.duration._
>
> object Main extends App {
>   val config = ConfigFactory.load()
>
>   implicit val system = ActorSystem("test-system", config)
>   implicit val executionContext = system.dispatcher
>   implicit val materializer = ActorMaterializer()
>
>   val request = HttpRequest(uri = Uri("http://www.google.com;))
>
>   val sink = MergeHub.source[(HttpRequest, Int)]
> .throttle(1, 1.second, 1, ThrottleMode.shaping)
> .map { x =>
>   println(s"started ${x._2}")
>   x
> }
> .via(Http().superPool())
> .to(Sink.foreach { x =>
>   println(s"finished ${x._2}")
> }).run()
>
>   for (x <- 1 to 100) sink.runWith(Source.single(request -> x))
> }
>
>
> The output of which is 
>
> started 2
> finished 2
> started 3
> finished 3
> started 4
> finished 4
> started 1
> finished 1
> started 5
> started 6
> started 7
> started 8
>
> My understanding from the documentation is that Http().superPool() will 
> backpressure. However, after 8 iterations, nothing else happens. Thoughts?
>
> Thanks
> Jeff
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-http] Http().superPool() and MergeHub.source backpressure

2017-08-09 Thread Jeff
I am getting behavior that I do not  understand combining 
Http().superPool() with MergeHub.source. Here is a sample application

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.stream.scaladsl.{MergeHub, Sink, Source}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

object Main extends App {
  val config = ConfigFactory.load()

  implicit val system = ActorSystem("test-system", config)
  implicit val executionContext = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val request = HttpRequest(uri = Uri("http://www.google.com;))

  val sink = MergeHub.source[(HttpRequest, Int)]
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.map { x =>
  println(s"started ${x._2}")
  x
}
.via(Http().superPool())
.to(Sink.foreach { x =>
  println(s"finished ${x._2}")
}).run()

  for (x <- 1 to 100) sink.runWith(Source.single(request -> x))
}


The output of which is 

started 2
finished 2
started 3
finished 3
started 4
finished 4
started 1
finished 1
started 5
started 6
started 7
started 8

My understanding from the documentation is that Http().superPool() will 
backpressure. However, after 8 iterations, nothing else happens. Thoughts?

Thanks
Jeff

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Flow.fromSinkAndSource and backpressure

2017-08-01 Thread Jeff
I didn't see much in the documentation, so I thought I'd ask. If I have a 
Sink.actorRef and a Source.actorRef, and I combine them using
 Flow.fromSinkAndSource, does this somehow propagate backpressure?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-streams] Generic streams and abstract types

2017-07-13 Thread Jeff
My understanding from the documentation is that using Http.singleRequest 
will fail requests when in a backpressure situation, while using a MergeHub 
with the flow will in fact backpressure. Is that a correct understanding?

Jeff

On Thursday, July 13, 2017 at 4:46:39 AM UTC-7, johannes...@lightbend.com 
wrote:
>
> On Wednesday, July 12, 2017 at 9:08:52 PM UTC+2, Jeff wrote:
>>
>> As for the issue of complexity, it's actually not as complex as it 
>> sounds. I'm using Http().superPool() to make api requests and I wanted to 
>> avoid having to create a separate stream for every single iteration of api 
>> request when the only thing that changed was the Unmarshaller. Instead of 
>> materializing multiple streams where the only thing that changed was the 
>> Sink, I just created one stream where the Sink.foreach(...) take the 
>> Unmarshaller function and resolves the Promise. 
>>
>
> You could just use Http.singleRequest in that case because it implements 
> almost the same kind of logic and also uses the same pool as superPool.
>
> Johannes
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-streams] Generic streams and abstract types

2017-07-12 Thread Jeff
Thanks for the great suggestions - I eventually came to the "custom tuple" 
solution myself and it seems to work well. 

As for the issue of complexity, it's actually not as complex as it sounds. 
I'm using Http().superPool() to make api requests and I wanted to avoid 
having to create a separate stream for every single iteration of api 
request when the only thing that changed was the Unmarshaller. Instead of 
materializing multiple streams where the only thing that changed was the 
Sink, I just created one stream where the Sink.foreach(...) take the 
Unmarshaller function and resolves the Promise. 

On Wednesday, July 12, 2017 at 5:24:07 AM UTC-7, johannes...@lightbend.com 
wrote:
>
> Hi Jeff,
>
> your API seems quite complex. I don't know the purpose of it so I cannot 
> suggest anything but I'd try to simplify. :)
>
> That said, your problem seems to be that you cannot write a concrete type 
> that would express the dependency between the two components of the tuple 
> `(RequestBuilder, Promise[???])`. There are two ways to solve it:
>
> 1) make `Out` a type parameter and convert `sink` to a `def sink[Out]`, 
> then you can use the tuple `(RequestBuilder[Out], Promise[Out])`
> 2) create your custom tuple type that allows to express the dependency:
>
> case class BuilderWithPromise[O](builder: RequestBuilder { type Out = O }, 
> promise: Promise[O])
>
> and then use it as MergeHub.source[BuilderWithPromise[_]]
>
> But I can only repeat that getting rid of the dependent types if possible 
> is usually the best solution ;)
>
> Johannes
>
> On Thursday, July 6, 2017 at 11:23:50 PM UTC+2, Jeff wrote:
>>
>> Here is a strawman program which illustrates the issue I am having
>>
>> trait RequestBuilder {
>>   type Out
>>
>>   def complete(p: Promise[Out]): Unit
>> }
>>
>> def makeRequest(in: RequestBuilder): Source[(RequestBuilder, 
>> Promise[in.Out]), Future[in.Out]] = {
>>   val p = Promise[in.Out]
>>
>>   Source.single(in -> p).mapMaterializedValue(_ => p.future)
>> }
>>
>> val sink = MergeHub.source[(RequestBuilder, Promise[???])].to(Sink.foreach {
>>   case (r, p) => r.complete(p)
>> }).run()
>>
>> sink.runWith(makeRequest(new RequestBuilder {
>>   type Out = Int
>>
>>   def complete(p: Promise[Out]): Unit = p.success(1)
>> }))
>>
>>
>> The issue is, how do I type the Promise[???]  in the sink? I have been 
>> able to work around this by making the Promise a part of the RequestBuilder 
>> trait itself, but this seems like a code smell to me
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-streams] Generic streams and abstract types

2017-07-10 Thread Jeff
Any thoughts?

On Thursday, July 6, 2017 at 2:23:50 PM UTC-7, Jeff wrote:
>
> Here is a strawman program which illustrates the issue I am having
>
> trait RequestBuilder {
>   type Out
>
>   def complete(p: Promise[Out]): Unit
> }
>
> def makeRequest(in: RequestBuilder): Source[(RequestBuilder, 
> Promise[in.Out]), Future[in.Out]] = {
>   val p = Promise[in.Out]
>
>   Source.single(in -> p).mapMaterializedValue(_ => p.future)
> }
>
> val sink = MergeHub.source[(RequestBuilder, Promise[???])].to(Sink.foreach {
>   case (r, p) => r.complete(p)
> }).run()
>
> sink.runWith(makeRequest(new RequestBuilder {
>   type Out = Int
>
>   def complete(p: Promise[Out]): Unit = p.success(1)
> }))
>
>
> The issue is, how do I type the Promise[???]  in the sink? I have been 
> able to work around this by making the Promise a part of the RequestBuilder 
> trait itself, but this seems like a code smell to me
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] Generic streams and abstract types

2017-07-06 Thread Jeff
Here is a strawman program which illustrates the issue I am having

trait RequestBuilder {
  type Out

  def complete(p: Promise[Out]): Unit
}

def makeRequest(in: RequestBuilder): Source[(RequestBuilder, Promise[in.Out]), 
Future[in.Out]] = {
  val p = Promise[in.Out]

  Source.single(in -> p).mapMaterializedValue(_ => p.future)
}

val sink = MergeHub.source[(RequestBuilder, Promise[???])].to(Sink.foreach {
  case (r, p) => r.complete(p)
}).run()

sink.runWith(makeRequest(new RequestBuilder {
  type Out = Int

  def complete(p: Promise[Out]): Unit = p.success(1)
}))


The issue is, how do I type the Promise[???]  in the sink? I have been able 
to work around this by making the Promise a part of the RequestBuilder 
trait itself, but this seems like a code smell to me

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: akka streams with http - downstream error propagation without collapsing the stream

2017-06-21 Thread Jeff
I am also running into a similar situation and have not found a solution 
that is satisfactory

On Monday, May 30, 2016 at 2:03:55 PM UTC-7, Matthew wrote:
>
> Hi Leslie,
>
> Did you ever find a nice pattern for this?  I ask because I'm doing 
> something very similar and am currently unhappy with the error handling 
> capabilities of Akka streams...
>
> On Monday, July 13, 2015 at 9:50:10 AM UTC-4, leslie...@googlemail.com 
> wrote:
>>
>> I've tried to implement a straight forward http flow using akka streams 
>> 1.0-RC4. It's just about getting some http/json input which is 
>> transformed/evaluated, gets some more info via akka-http, transformed 
>> evaluated and finally outputs to json http response.
>> For the optimistic case - where no errors occur - its quite simple and 
>> straight forward.
>>
>> Thinking about the error handling propagation makes it quite complicated 
>> to me. Especially when keeping the stream running while don't swallow 
>> errors silently. They should appear in the final Sink stage wrapped into a 
>> Try or Either object. To make it worse nearly every stage could run into 
>> some kind of error/exception case.
>> I thought about some options which seam not really promising to me like:
>>
>> 1. For each stage just propagate a Try or Either object to the output, 
>> like HttpExt.superPool flow does.
>>   -would need to pass data through all stages of the stream even if the 
>> first stage failed already.
>>   -each flow stage would need a Try/Either input than in order to handle 
>> the upstream success/error case.
>>   -at least HttpExt.superPool flow stage would not accept Try/Either 
>> input so the flow needs to be forked here
>>   -it seems quite complicated to me to make every flow stage a graph with 
>> two outputs (one for success flowing into the next stage and one for error 
>> flowing into the final stage)
>>
>> 2. Exception handling:
>>   -just throwing exceptions collapses the stream which is definitely not 
>> what I want
>>   - (misuse?) the supervision decider for error handling, but I have no 
>> access to the final Sink stage here
>>   -when using decider exception handler, I would need to add it either to 
>> each stage separately or globally to the materializer (can I have on per 
>> stream or is it to heavy weight?, than this would be ok. but still no 
>> access to the Sink)
>>   -when using decider exception handler, I still would need to add 
>> exception handling code to all stages due to the request context Promise 
>> which I need to pass through all flow stages in order to fulfill the 
>> incoming request in the sink. There needs to be some way in order to pass 
>> this Promise request context to the decider.
>>
>> Currently I use the supervision decider solution. I intercept all flow 
>> stage operations. In case of exceptions I put the original exception 
>> together with the request context Promise into a special exception and let 
>> it throw to the decider where I can complete the request context Promise 
>> with failure and resume the flow.
>>
>> It looks somehow not right to me but I run out of ideas so far.
>>
>> Regards,
>> Leslie
>>
>> ++ 
>> |http/json in+-+   
>> +-+--+ |   
>>   ||   
>>   v|   
>> +-+--+ |   
>> |several processing steps+-+   
>> +-+--+ |   
>>   ||   
>>   v|   
>> +-+---+|   
>> |akka http++   
>> +-+---+|   
>>   ||   
>>   v|   
>> +-+--+ |   
>> |several processing steps+-+   
>> +-+--+ |   
>>   ||error / exception   
>>   v|propagation?   
>> +-+---+|   
>> |akka http++   
>> +-+---+|   
>>   ||   
>>   v|   
>> +-+--+ |   
>> |several processing steps+-+   
>> +-+--+ |   
>>   ||   
>>   vv   
>> +-++--+
>> |final http/json out (succes or error)|
>> +-+
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to 

[akka-user] Re: [akka-http] Tagless Final and ToResponseMarshallable

2017-06-15 Thread Jeff
That gave me a lot of inspiration, much thanks. I've adapted that into the 
following pattern:

abstract class RestaurantRoutes[F[_]: Monad](algebra: RestaurantAlgebra[F]) 
extends MonadToResponseMarshaller[F] {
  import algebra._

  def route: Route =
path("api" / "v4.0" / "restaurant" / IntNumber) { rid =>
  get {
complete(getRestaurantById(rid).map(_.toString))
  }
}
}

trait MonadToResponseMarshaller[F[_]] {
  implicit def monadMarshaller[A: ToResponseMarshaller]: 
ToResponseMarshaller[F[A]]
}

trait FutureOfOptionToResponseMarshaller { this: 
MonadToResponseMarshaller[FutureOfOption] =>
  import CatsMarshallers._

  def monadMarshaller[A: ToResponseMarshaller]: 
ToResponseMarshaller[FutureOfOption[A]] =
implicitly
}

trait Routes {
  import cats.instances.future._

  def routes(implicit ec: ExecutionContext): Route = {
val restaurant = new RestaurantRoutes(new 
RestaurantFutureInterpreter()) with FutureOfOptionToResponseMarshaller

restaurant.route
  }
}

trait CatsMarshallers {
  implicit def optionTMarshaller[F[_], A, B](implicit m: 
Marshaller[F[Option[A]], B]): Marshaller[OptionT[F, A], B] =
Marshaller { implicit ec => v => m(v.value) }
}

object CatsMarshallers extends CatsMarshallers

This seems fairly clean and composable, but I'm wondering if there's a way 
to prevent having to mix-in FutureOfOptionToResponseMarshaller every time.

On Tuesday, June 13, 2017 at 10:30:53 PM UTC-7, Juan José Vázquez Delgado 
wrote:
>
> I've faced a similar situation recently working on this example 
> <https://github.com/juanjovazquez/scala-petclinic>. It's not easy but is 
> definitely feasible. The bottom line is: run your own marshaller.
>
> El martes, 13 de junio de 2017, 18:44:01 (UTC+2), Jeff escribió:
>>
>> I am trying to learn tagless final style to remove concrete monads (ie 
>> futures) from my code. I have come up with the following strawman 
>> application (using cats)
>>
>> class RestaurantRoutes[F[_]: Monad](algebra: RestaurantAlgebra[F]) {
>>   import algebra._
>>
>>   def route: Route =
>> path("api" / "v4.0" / "restaurant" / IntNumber) { rid =>
>>   get {
>> complete(getRestaurantById(rid).map(_.toString).value)
>>   }
>> }
>> }
>>
>> object RestaurantRoutes {
>>   def apply()(implicit ec: ExecutionContext): 
>> RestaurantRoutes[FutureOfOption] = {
>> import cats.instances.future._
>>
>> new RestaurantRoutes(new RestaurantFutureInterpreter())
>>   }
>>
>> }
>>
>> the final type of 
>>
>> getRestaurantById(rid).map(_.toString).value
>>
>> is a
>>
>> Future[Option[String]]
>>
>> However, I can't seem to figure out to get that marshalled. A marshaller 
>> exists for all those types, and had I had concrete types intstead of F[_], 
>> it would have worked. What is the best way to handle this to prevent 
>> implementation details from leaking in?
>>
>> Thanks
>> Jeff
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-http] Tagless Final and ToResponseMarshallable

2017-06-13 Thread Jeff
It's not even limited to Futures - in this case, how do I marshall any 
content in this situation when the type is parameterized?

On Tuesday, June 13, 2017 at 12:50:40 PM UTC-7, Justin du coeur wrote:
>
> Good luck -- I've been doing a lot with tagless final in my Akka Sharded / 
> Persistent code, and have the beginnings of a library for that 
> <https://github.com/jducoeur/Querki/tree/master/querki/scalajvm/app/funcakka>,
>  
> but I've found that *completely* doing away with the Futures is pretty 
> challenging.  I've gotten it most of the way there, but I still have to 
> cheat occasionally...
>
> On Tue, Jun 13, 2017 at 12:44 PM, Jeff <jknigh...@gmail.com > 
> wrote:
>
>> I am trying to learn tagless final style to remove concrete monads (ie 
>> futures) from my code. I have come up with the following strawman 
>> application (using cats)
>>
>> class RestaurantRoutes[F[_]: Monad](algebra: RestaurantAlgebra[F]) {
>>   import algebra._
>>
>>   def route: Route =
>> path("api" / "v4.0" / "restaurant" / IntNumber) { rid =>
>>   get {
>> complete(getRestaurantById(rid).map(_.toString).value)
>>   }
>> }
>> }
>>
>> object RestaurantRoutes {
>>   def apply()(implicit ec: ExecutionContext): 
>> RestaurantRoutes[FutureOfOption] = {
>> import cats.instances.future._
>>
>> new RestaurantRoutes(new RestaurantFutureInterpreter())
>>   }
>>
>> }
>>
>> the final type of 
>>
>> getRestaurantById(rid).map(_.toString).value
>>
>> is a
>>
>> Future[Option[String]]
>>
>> However, I can't seem to figure out to get that marshalled. A marshaller 
>> exists for all those types, and had I had concrete types intstead of F[_], 
>> it would have worked. What is the best way to handle this to prevent 
>> implementation details from leaking in?
>>
>> Thanks
>> Jeff
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-http] Tagless Final and ToResponseMarshallable

2017-06-13 Thread Jeff
I am trying to learn tagless final style to remove concrete monads (ie 
futures) from my code. I have come up with the following strawman 
application (using cats)

class RestaurantRoutes[F[_]: Monad](algebra: RestaurantAlgebra[F]) {
  import algebra._

  def route: Route =
path("api" / "v4.0" / "restaurant" / IntNumber) { rid =>
  get {
complete(getRestaurantById(rid).map(_.toString).value)
  }
}
}

object RestaurantRoutes {
  def apply()(implicit ec: ExecutionContext): 
RestaurantRoutes[FutureOfOption] = {
import cats.instances.future._

new RestaurantRoutes(new RestaurantFutureInterpreter())
  }

}

the final type of 

getRestaurantById(rid).map(_.toString).value

is a

Future[Option[String]]

However, I can't seem to figure out to get that marshalled. A marshaller 
exists for all those types, and had I had concrete types intstead of F[_], 
it would have worked. What is the best way to handle this to prevent 
implementation details from leaking in?

Thanks
Jeff

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Dispatchers that fail fast instead of enqueuing?

2016-10-22 Thread Jeff Henrikson
Hello akka-user list,

I have had a recent experience of being surprised by the APIs in akka 
surrounding streaming HTTP.  I would like some feedback on the solution that I 
have converged on.  I'll begin with the use case:

- Modules akka-actor, akka-stream, and akka-http-core are used to build a 
streaming data application.
- The application ingests data via a purpose-built agent running on an upstream 
service.  Upstream agent pushes HTTP POST requests encoded, for efficiency, in 
raw bytes.
- Latency between upstream agent and service is low, so only a few inbound TCP 
connections will saturate the 10Gb/s NIC.
- In order to ensure at least once delivery, agent is configured to retry all 
POSTS for which the service does not respond with status 200 (OK).
- In order to ensure at least once delivery, service is configured to sync disk 
before responding with HTTP status 200 (OK).
- Parallel instances of the agent partition the data before transmission.

Given these constraints, it becomes clear that we need to limit the number of 
inbound connections.  Only a few are necessary to saturate the connection, and 
more only increase the CPU and memory used.  As an implementation that makes 
sense to me, HTTP status 503 can be issued to the agent upstream when maximum 
parallelism has been reached.  In order to minimize disorder, the agent should 
be notified as soon as possible of the busy node so the agent can proceed with 
retry to a different node.

Note that since the input is streaming to disk, an akka-streams 
ActorMaterializer as well as the http actor must have its parallelism limited.  
Once ActorMaterializer gets a flow started, it seems tricky to get it to stop 
efficiently.

When I began with the problem, it appeared as very likely that one of the many 
implementations of dispatcher could provide this behavior.  There is much 
language about "bounded mailboxes" and etc.  My three attempts to solve this 
problem went like this:

1) Try to find some Typesafe config, .withDispatcher and related calls to bound 
the parallelism of the actor and ActorMaterializer.  This started taking a long 
time, and all configurations found were enqueuing requests instead of failing.

2) I wrote a custom subclass of ThreadPoolExecutor with a hard maximum thread 
count.  I wrapped it from ExecutionContext.fromExecutor and called my HTTP 
handler.

This approach failed when I realized that even though the actor and 
ActorMaterializer were gated by my counter, my counter was decremented too 
soon.  Remember, I only have to have say 3 connections, so 3 threads are very 
cheap.  I had hoped Async.result could sit and keep a thread running for a 
connection.  But as it turns out, even though all the non-blocking bloggers out 
there say to not call Async.result, I couldn't get Async.result or any related 
API to hold a thread!  It must be doing something clever. . . .

3) Finally, I abandoned the Dispatcher and ExecutionContext APIs entirely, in 
favor of a simple Future:

import scala.concurrent.{ExecutionContext, Promise, Future}
import scala.util.Try

class FutureSemaphore(numConcurrentMax : Int, ec : ExecutionContext)
{
  implicit val _ec = ec
  var numConcurrent = 0

  def submit[U](fnf : Unit => Future[U]) : Option[Future[U]] = {
 if(numConcurrent < numConcurrentMax) {
  synchronized {
numConcurrent = numConcurrent + 1
  }
  val f = fnf()
  val g = Promise[U]()
  f.onComplete{(t:Try[U]) => {
synchronized {
  numConcurrent = numConcurrent - 1
}
g.complete(t)
  }}
  Some(g.future)
}
else
  None
  }
}
While the code is small and seems to work as advertised, I am still confused 
about why this took too long to figure out.  Did I miss something in the 
documentation?

Here is the reference documentation:
3.4 Dispatchers

An Akka MessageDispatcher is what makes Akka Actors “tick”, it is the engine of 
the machine so to speak. All MessageDispatcher implementations are also an 
ExecutionContext, which means that they can be used to execute arbitrary code, 
for instance Futures. 

If it is true that dispatchers cannot bound parallelism, it seems like basic 
design information and the reference should say so.  If alternatively, some 
clever configuration of a dispatcher subclass can bound parallelism and fail 
fast, then the configuration should be shown.  Maybe I missed it.  I dunno.

Thanks in advance for any insights.

Regards,


Jeff Henrikson











-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You recei

[akka-user] [akka-streams] Graph and Stream of Streams

2015-12-14 Thread Jeff
I am working my way through the websocket example and have a question about 
nested streams. Currently I have this: 

import akka.actor.ActorRef
import akka.http.scaladsl.model.ws.{BinaryMessage, TextMessage, Message}
import akka.stream.{OverflowStrategy, FlowShape}
import akka.stream.scaladsl.{Sink, Source, FlowGraph, Flow}

object ChatFlow {
  def apply(userId: Long, channel: Long, rabbitActor: ActorRef): Flow[Message, 
Message, ActorRef] =
Flow.fromGraph(FlowGraph.create(Source.actorRef[RabbitActor.Message](5, 
OverflowStrategy.fail)) { implicit b =>
  rabbitSource =>
import FlowGraph.Implicits._

// for now, ignore all content coming from the websocket
val fromWebsocket = b.add(Sink.foreach[Message] {
  case TextMessage.Strict(content) => println(content)
//  case bm: BinaryMessage => bm.dataStream.runWith(Sink.ignore)
})

// translate rabbit messages into socket messages
val toWebSocket = b.add(Flow[RabbitActor.Message] map {
  case _ => TextMessage(s"test")
})

// when this stream is materialized, register it with rabbitmq
val websocketMaterialized = b.materializedValue map { actor =>
  RabbitActor.UserJoined(userId, channel, actor)
}

val rabbitMQSink = Sink.actorRef[RabbitActor.Message](rabbitActor, 
RabbitActor.UserLeft(userId, channel))

websocketMaterialized ~> rabbitMQSink
rabbitSource ~> toWebSocket

FlowShape(fromWebsocket.inlet, toWebSocket.outlet)
})
}


My question is about the commented out BinaryMessage. bm.dataStream is a 
Source, and the docs say I should drain this stream to clear the socket. 
How would I do that in this scenario without passing in a materializer?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka HTTP (Scala) 1.0-RC[12]: Processor actor terminated abruptly

2015-08-14 Thread Jeff Simpson
Any solutions to this one yet? I'm getting the same error in a Play 2.4 
app. During Global.onStop I call my subscribers onComplete function, but I 
still see the Processor actor terminated abruptly error. Any ideas?

On Thursday, April 30, 2015 at 2:55:32 PM UTC-6, Samuel Tardieu wrote:

 Hi.

 With akka http 1.0-RC2 (was similar in 1.0-RC1), one of my program signals 
 an intermittent error that I do not understand.

 The context: one HTTP GET request is sent out, the JSON response is 
 properly received, decoded and acted upon, and then the system terminates 
 with (system being my ActorSystem):

 Await.ready(Http().shutdownAllConnectionPools(), 5.seconds)
 system.shutdown()

 The error only happens from time to time (I’d say once every three runs). 
 I have activated akka.actor.debug.{receive,autoreceive,lifecycle,fsm}.

 Any idea of what could be the cause?

 INFO] [04/30/2015 22:36:01.916] [run-main-0] [ActorSystem(default)] 
 Initiating orderly shutdown of all active host connections pools...
 [DEBUG] [04/30/2015 22:36:01.921] [default-akka.actor.default-dispatcher-8] 
 [akka://default/system/deadLetterListener] stopped
 [DEBUG] [04/30/2015 22:36:01.922] [default-akka.actor.default-dispatcher-5] 
 [akka://default/user/$a/flow-2-3-publisherSource-processor-mapConcat] stopped
 [DEBUG] [04/30/2015 22:36:01.922] [default-akka.actor.default-dispatcher-6] 
 [akka://default/user/$a/flow-2-9-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.retryMerge-flexiMerge]
  stopped
 [DEBUG] [04/30/2015 22:36:01.922] [default-akka.actor.default-dispatcher-5] 
 [akka://default/user/$a/flow-2-2-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute]
  stopped
 [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-12] 
 [akka://default/user/$a/flow-2-4-publisherSource-Merge] stopped
 [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-14] 
 [akka://default/user/$a/flow-2-1-publisherSource-Merge] stopped
 [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-10] 
 [akka://default/user/$a/flow-2-11-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.RetrySplit-flexiRoute]
  stopped
 [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-12] 
 [akka://default/user/$a/flow-2-12-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute]
  stopped
 [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-14] 
 [akka://default/user/$a/flow-2-14-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute]
  stopped
 [DEBUG] [04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-10] 
 [akka://default/user/$a/flow-2-5-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute]
  stopped
 [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-12] 
 [akka://default/user/$a/flow-2-13-publisherSource-processor-mapConcat] stopped
 [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-14] 
 [akka://default/user/$a/flow-2-15-publisherSource-processor-mapConcat] stopped
 [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-10] 
 [akka://default/user/$a/flow-2-6-publisherSource-processor-mapConcat] stopped
 [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-3] 
 [akka://default/user] stopping
 [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-4] 
 [akka://default/user/SlotProcessor-0] stopped
 [DEBUG] [04/30/2015 22:36:01.924] [default-akka.actor.default-dispatcher-14] 
 [akka://default/user/SlotProcessor-3] stopped
 [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-10] 
 [akka://default/user/SlotProcessor-1] stopped
 [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-12] 
 [akka://default/user/SlotProcessor-2] stopped
 [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-12] 
 [akka://default/user/$a/flow-3-16-actorPublisherSource-actorSubscriberSink] 
 stopped
 [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-13] 
 [akka://default/user/$a/flow-2-8-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.Route-flexiRoute]
  stopped
 [DEBUG] [04/30/2015 22:36:01.926] [default-akka.actor.default-dispatcher-11] 
 [akka://default/user/$a/flow-2-7-publisherSource-PoolConductor.retryMerge-flexiMerge-mapConcat]
  stopped
 [DEBUG] [04/30/2015 22:36:01.928] [default-akka.actor.default-dispatcher-8] 
 [akka://default/user/PoolInterfaceActor-0] stopped
 [DEBUG] [04/30/2015 22:36:01.928] [default-akka.actor.default-dispatcher-9] 
 [akka://default/user/$a/flow-2-10-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.SlotSelector-flexiMerge]
  stopped
 [DEBUG] [04/30/2015 22:36:01.928] [default-akka.actor.default-dispatcher-14] 
 [akka://default/user/$a/flow-3-0-actorPublisherSource-actorPublisherSource] 
 stopped
 [DEBUG] [04/30/2015 22:36:01.928] [default-akka.actor.default-dispatcher-9] 
 

Re: [akka-user] Promises and message passing

2015-07-16 Thread Jeff
If everything is local an no serialization is required, would it be worth 
it? From the documentation on the website, it indicates that Ask has a 
performance cost associated with it.

On Wednesday, July 15, 2015 at 8:02:57 PM UTC-7, √ wrote:

 Because you can't serialize a promise (if the actor decides to send the 
 message to another node)

 On Thu, Jul 16, 2015 at 4:51 AM, Jeff jknigh...@gmail.com javascript: 
 wrote:

 Is there any reason why I wouldn't want to pass a Promise to a (local) 
 ActorRef to be resolved? I know the Ask pattern exists, but I would like to 
 avoid the cost of having to allocate PromiseActorRefs.

 Thanks
 Jeff

 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Cheers,
 √
  

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Promises and message passing

2015-07-15 Thread Jeff
Is there any reason why I wouldn't want to pass a Promise to a (local) 
ActorRef to be resolved? I know the Ask pattern exists, but I would like to 
avoid the cost of having to allocate PromiseActorRefs.

Thanks
Jeff

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Tracking down Future timeout failures in a cluster under load

2015-07-01 Thread Jeff Pennal
Hello,

I am currently in the process of evaluating Akka. I have successfully built 
a proof of concept REST API using akka-http which is part of an Akka 
cluster system that has a number of Actors in it.

Everything works well and I'm happy with what I am seeing until I try to 
ramp up the load on the REST API server and then I get the server to a 
point where the requests start failing with a Future timed out message.

My problem is that I am not sure where to find out *why* and *where* the 
request are timing out.

As I understand Akka, the timeout is simply a scheduler in the Future 
saying that I have exceeded the amount of time that I should wait for the 
Actor to return something. As such I am not sure how I would be able to 
find out at exactly what point the Actor I called was in processing my 
request when the timeout occurred.

Can anyone give me some advice on how I should be tracking down this issue?

Thanks in advance,
Jeff

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-cluster] expected-response-after and acceptable-heartbeat-pause

2015-06-16 Thread Jeff
So if my understanding is correct, expected-response-after and 
acceptable-heartbeat-pause do the exact same thing, the only difference is 
when they kick in. expected-response-after is for the first heartbeat, and 
accetable-heartbeat-pause is for every subsequent heartbeat. Is my 
understanding correct?

Thanks
Jeff

On Monday, June 15, 2015 at 7:40:35 AM UTC-7, Patrik Nordwall wrote:

 expected-response-after is used for starting failure detection when the 
 first heartbeat message is not received as expected, e.g. when connection 
 is broken from the beginning.

 acceptable-heartbeat-pause is explained here: 
 http://doc.akka.io/docs/akka/2.3.11/scala/cluster-usage.html#Failure_Detector

 /Patrik

 On Fri, Jun 5, 2015 at 12:41 AM, Jeff jknigh...@gmail.com javascript: 
 wrote:

 Can somebody enlighten me on how these two settings interact? These two 
 settings seem to overlap.

 Thanks
 Jeff

 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 

 Patrik Nordwall
 Typesafe http://typesafe.com/ -  Reactive apps on the JVM
 Twitter: @patriknw

  

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-cluster] expected-response-after and acceptable-heartbeat-pause

2015-06-16 Thread Jeff
Sure, ticket is here https://github.com/akka/akka/issues/17750

On Tuesday, June 16, 2015 at 12:51:56 PM UTC-7, Patrik Nordwall wrote:



 On Tue, Jun 16, 2015 at 8:51 PM, Jeff jknigh...@gmail.com javascript: 
 wrote:

 So if my understanding is correct, expected-response-after and 
 acceptable-heartbeat-pause do the exact same thing, the only difference is 
 when they kick in. expected-response-after is for the first heartbeat, and 
 accetable-heartbeat-pause is for every subsequent heartbeat. Is my 
 understanding correct?


 expected-response-after is added on top of the acceptable-heartbeat-pause 
 for the first heartbeat. Now when we discuss it I can't really find a good 
 motivation for that delay, at least not as long as the default 5 seconds. 
 Perhaps it is a leftover from when we used one-way heartbeating, instead of 
 current request-response hearbeating.

 Would you mind creating a github issue 
 https://github.com/akka/akka/issues and we will investigate if that 
 delay can be removed, or reduced.
  
 Thanks,
 Patrik


 Thanks
 Jeff

 On Monday, June 15, 2015 at 7:40:35 AM UTC-7, Patrik Nordwall wrote:

 expected-response-after is used for starting failure detection when the 
 first heartbeat message is not received as expected, e.g. when connection 
 is broken from the beginning.

 acceptable-heartbeat-pause is explained here: 
 http://doc.akka.io/docs/akka/2.3.11/scala/cluster-usage.html#Failure_Detector

 /Patrik

 On Fri, Jun 5, 2015 at 12:41 AM, Jeff jknigh...@gmail.com wrote:

 Can somebody enlighten me on how these two settings interact? These two 
 settings seem to overlap.

 Thanks
 Jeff

 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: 
 https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send 
 an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 

 Patrik Nordwall
 Typesafe http://typesafe.com/ -  Reactive apps on the JVM
 Twitter: @patriknw

   -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 

 Patrik Nordwall
 Typesafe http://typesafe.com/ -  Reactive apps on the JVM
 Twitter: @patriknw

  

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-cluster] expected-response-after and acceptable-heartbeat-pause

2015-06-04 Thread Jeff
Can somebody enlighten me on how these two settings interact? These two 
settings seem to overlap.

Thanks
Jeff

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams 1.0-RC3] ActorSubscriber instances aren't shut down

2015-06-04 Thread Jeff Wartes


I took a page out of the stream-integrations documentation, and hooked a 
simple ActorSubscriber up as a sink to a Flow. Something like:

class TestActorSubscriber extends ActorSubscriber {
  override def postStop() = {
println(Stopped  + self.toString)
super.postStop
  }
  override def preStart() = {
println(Started  + self.toString)
super.preStart
  }
  override protected def requestStrategy: RequestStrategy = // some stuff
  override def receive: Receive = {
case OnNext(x) = // some stuff
  }
}

implicit val mat = ActorFlowMaterializer()
val runnableFlow = Source( () = io.Source.fromFile(foo.txt).getLines() 
).to(Sink.actorSubscriber(Props(new TestActorSubscriber())
runnableFlow.run()


Since the Sink takes a Props, and not an ActorRef, I assume the 
ActorSystemMaterializer is responsible for the lifecycle of that 
ActorSubscriber instance. When I run this flow, things work fine - the 
TestActorSubscriber is created (preStart fires), the messages stream 
through. 
However, the actor instance never stops once the iterator/Source completes. 
I can verify the OnComplete message gets to the TestActorSubscriber, but I 
never see the postStop fire.

Looking at the ActorSubscriber source, I see that it's designed to stop the 
actor if there's an attempt to create a subscription after an OnComplete is 
received. 
It doesn't look like anything happens in the normal case though, where the 
OnComplete/OnError is delivered *after* a subscription has already been 
established.

Is this just a bug? How is the lifecycle of ActorSubscriber/ActorPublisher 
instances managed?

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Disambiguate actorRefs

2015-05-26 Thread Jeff
I have an actor that I need to pass several actorRefs into (remote 
services). Is there a best practice for doing this with type safety (so a 
user doesn't accidentally pass the wrong actorRef)? Some options I've 
considered are using scalaz Tags and native scala Value Classes. 

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Receive as parameter

2015-05-26 Thread Jeff
This is great advice, thanks!

On Saturday, May 23, 2015 at 1:58:29 AM UTC-7, rkuhn wrote:

 Using something like Spores you could ascertain that your partial function 
 does indeed not close over its surrounding Actor when creating anonymous 
 Actors inline, but Spores are still an experiment. An equally safe but not 
 as elegant solution is to declare the partial function in the Actor’s 
 companion object and pass the needed values into the method that 
 instantiates it (just like the `def props(...): Props` recommendation).

 Regards,

 Roland

 22 maj 2015 kl. 23:57 skrev Jeff jknigh...@gmail.com javascript::

 Following up on this question, what are the best practices around creating 
 anonymous actors, as long as you are not closing over context of a parent 
 actor?

 On Friday, May 22, 2015 at 12:08:45 PM UTC-7, Jeff wrote:

 Is it bad practice to pass in the Receive pf to an actor as part of the 
 constructor arguments, assuming all vals it closes over are consts?


 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 *Dr. Roland Kuhn*
 *Akka Tech Lead*
 Typesafe http://typesafe.com/ – Reactive apps on the JVM.
 twitter: @rolandkuhn
 http://twitter.com/#!/rolandkuhn
  


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka Receive as parameter

2015-05-22 Thread Jeff
Following up on this question, what are the best practices around creating 
anonymous actors, as long as you are not closing over context of a parent 
actor?

On Friday, May 22, 2015 at 12:08:45 PM UTC-7, Jeff wrote:

 Is it bad practice to pass in the Receive pf to an actor as part of the 
 constructor arguments, assuming all vals it closes over are consts?


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Receive as parameter

2015-05-22 Thread Jeff
Is it bad practice to pass in the Receive pf to an actor as part of the 
constructor arguments, assuming all vals it closes over are consts?

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] system.scheduler.scheduleOnce consumes Heap over time

2015-05-12 Thread Jeff Steinmetz
I thought there was something specific I was doing wrong in an application 
I have deployed in development, which eventually needs to be production 
ready.

Over time, the Akka actor system slowly consumes all available memory.  I 
assumed it was a memory leak that I introduced, but as it turns out, even 
the sample Typesafe hello-akka app demonstrates the same heap consumption.

http://www.typesafe.com/activator/template/hello-akka


If you let the sample application above run for a very long, the heap 
usage slowly grows, and is never released.  Although, it is Very Slow
Since it is set to send a message to the greeter only every 1 second, 
this isn't very apparent.  

I simplified the pattern I am using in an actual application, that 
demonstrates the same issue, but at an accelerated rate.
This is modeled after the Scheduling Periodic Messages pattern described 
here:

http://doc.akka.io/docs/akka/snapshot/scala/howto.html


The full sample code to recreate the memory consumption, with heap usage 
logging is available at the gist below.  (note: if you start with the 
hello-akka sample application, and replace HelloAkkaScala.scala with this 
gist below, you can reproduce.  You will need to add to your build.sbt the 
following (as mentioned in the comments at the top of the gist)

  com.kenshoo %% metrics-play % 2.3.0_0.1.9  

https://gist.github.com/jeffsteinmetz/bbccb4815858620ab5a2

You may need to let it run for about 1 to 2 minutes before you see the 
initial bump in heap usage.  After that, you will see it slowly climb and 
never back down.
Note - the metrics calls are not required, and do not cause the memory leak 
- you can watch system memory without this library and see the same issue.

I've tried a few versions of Akka, with the same results.  In production, a 
similar (real world) application eventually consumes all memory.

Hoping for a little insight on the Akka internals, and potential solution.

J





-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Debugging Akka Cluster

2015-05-07 Thread Jeff
Having the debugging profile only suspend the current thread helped a lot, 
thanks!

On Thursday, May 7, 2015 at 1:23:41 AM UTC-7, 何品 wrote:

 you could just pause the current thread.not all of them.cursive debugger 
 could do that.

 在 2015年5月7日星期四 UTC+8上午10:50:41,Jeff写道:

 Every time I set a breakpoint in an actor involved with Actor Clustering, 
 the phi detector triggers and marks that node as Unreachable. Is there a 
 way to prevent this during debugging? 



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Debugging Akka Cluster

2015-05-06 Thread Jeff
Every time I set a breakpoint in an actor involved with Actor Clustering, 
the phi detector triggers and marks that node as Unreachable. Is there a 
way to prevent this during debugging? 

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] withAttributes scope

2015-04-24 Thread Jeff
What is the scope of .withAttributes()? Does it only apply to the stage 
that it is called on, or every stage up to that point? For Example

Sink.empty.transform(() = ...).transform(() = 
...).withAttributes(ActorOperationAttributes.dispatcher(test-dispatcher))

Are all the transformation stages run under test-dispatcher, or just the 
last one? If I only want to apply the attributes to a single stage, should 
I use .via()?

Thanks
Jeff

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Cluster router significantly outperforms remote router

2015-04-13 Thread Jeff
I have updated the ticket with the config I am using. 

On Friday, April 10, 2015 at 2:52:47 AM UTC-7, Akka Team wrote:

 I added a ticket: https://github.com/akka/akka/issues/17171

 On Fri, Apr 10, 2015 at 10:23 AM, Endre Varga endre...@typesafe.com 
 javascript: wrote:

 Wow! This is an interesting finding, we need to investigate. 

 -Endre

 On Fri, Apr 10, 2015 at 3:38 AM, Jeff jknigh...@gmail.com javascript: 
 wrote:

 I have been able to isolate the code that is running differently between 
 the two different actor providers. Below is the receive of the 
 offending actor:

 var now: Long = _

   def receive = {
 case UserMessage.LookupOrgsResponse(_, uo) =
   now = new Date().getTime

   db ! OrgMessage.BatchLookupByIds(uo.map( _.orgId ))
   db ! PersonMessage.BatchLookupById(uo.map( _.personId ))

 case OrgMessage.BatchLookupResponse(os) =
   println(sorgs took ${new Date().getTime - now} ms)
   orgs = Some(os); process

 case PersonMessage.BatchLookupByIdResponse(ps) =
   println(spersons took ${new Date().getTime - now} ms)
   persons = Some(ps); process
   }

 When running with provider = akka.remote.RemoteActorRefProvider, I 
 get the following times: 

 persons took 15 ms
 orgs took 57 ms

 orgs took 13 ms
 persons took 55 ms

 orgs took 15 ms
 persons took 57 ms

 orgs took 19 ms
 persons took 61 ms

 orgs took 12 ms
 persons took 52 ms

 persons took 27 ms
 orgs took 68 ms

 orgs took 14 ms
 persons took 55 ms

 And when I run it with provider = akka.cluster.ClusterActorRefProvider 
 I get: 

 orgs took 12 ms
 persons took 14 ms

 orgs took 15 ms
 persons took 20 ms

 persons took 13 ms
 orgs took 15 ms

 orgs took 12 ms
 persons took 16 ms

 orgs took 10 ms
 persons took 12 ms

 Again, nothing in the business logic is changing. I am simply changing 
 the provider in the application.conf and making the routers cluster aware. 
 These results are consistent on a warmed jvm. 


 On Thursday, April 9, 2015 at 12:10:38 AM UTC-7, drewhk wrote:

 Hi Jeff,

 On Thu, Apr 9, 2015 at 7:59 AM, Jeff jknigh...@gmail.com wrote:

 I am building an actor system that basically has a topology of spray 
 - business logic actor - slick. I initially built these using akka 
 remote, but have recently switch over to akka cluster. In testing these 
 systems, I have noticed that round trip calls through the system take 
 half 
 the time using akka cluster as they did using akka remote. Literally, the 
 only thing changed between benchmarks is the actor provider. Why would 
 akka 
 cluster provider perform so much better on identical workloads? I was 
 under 
 the impression that akka cluster built on top of akka remote.


 This is indeed strange. ClusterActorRefProvider actually extends 
 RemoteActorRefProvider: https://github.com/akka/akka/blob/
 8485cd2ebb46d2fba851c41c03e34436e498c005/akka-cluster/src/
 main/scala/akka/cluster/ClusterActorRefProvider.scala#L26

 Do you have a small example that demonstrates this? It would be 
 interesting to investigate what is the source of difference. Maybe 
 deathwatch, I don't know.

 -Endre
  

  -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: http://doc.akka.io/docs/akka/
 current/additional/faq.html
  Search the archives: https://groups.google.com/
 group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send 
 an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.


  -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: 
 https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send 
 an email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.


  -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Akka Team

[akka-user] Re: [akka-stream] Props Source ignoring dispatcher

2015-04-13 Thread Jeff
This is using 1.0-M5

On Monday, April 13, 2015 at 9:32:38 PM UTC-7, Jeff wrote:

 I am creating an ActorPublisher to encapsulate a kafka consumer. I am 
 trying to bulkhead the actor behind a custom dispatcher (since the kafka 
 consumer is blocking) with the following code:

 val in = 
 Source[Array[Byte]](KafkaConsumerActor.props(consumer.createMessageStreamsByTopic(topic).head).withDispatcher(kafka-consumer-dispatcher))

 However, when I set a breakpoint in the receive method and inspect the 
 context, the dispatcher is set to default-dispatcher


 https://lh3.googleusercontent.com/-GSsCrdUDXtc/VSyYFsKunFI/BPM/OQ8Q9tdqGbA/s1600/Screen%2BShot%2B2015-04-13%2Bat%2B9.28.27%2BPM.png



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] Props Source ignoring dispatcher

2015-04-13 Thread Jeff


I am creating an ActorPublisher to encapsulate a kafka consumer. I am 
trying to bulkhead the actor behind a custom dispatcher (since the kafka 
consumer is blocking) with the following code:

val in = 
Source[Array[Byte]](KafkaConsumerActor.props(consumer.createMessageStreamsByTopic(topic).head).withDispatcher(kafka-consumer-dispatcher))

However, when I set a breakpoint in the receive method and inspect the 
context, the dispatcher is set to default-dispatcher

https://lh3.googleusercontent.com/-GSsCrdUDXtc/VSyYFsKunFI/BPM/OQ8Q9tdqGbA/s1600/Screen%2BShot%2B2015-04-13%2Bat%2B9.28.27%2BPM.png

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka-persistence vs durable mailbox

2014-07-08 Thread Jeff Simpson
The main difference I see with the change from a DurableMailbox to using a 
PersistentChannel/AtLeastOnceDelivery is the former seems pull based 
while the latter is push. Is this the case? I'm curious if there's a way 
I can essentially implement a durable mailbox based on the Persistence 
framework?  Basically I want to pile a bunch of messages into this mailbox 
and have the actor process them when it can with a producer timeout 
restriction. I know this is related to the Reactive Stream stuff, but I'm 
interested in something near term and deciding if I should implement a 
DurableMailbox or spend more time trying to bend Persistence to work with 
my stateless actor.

Thanks
-Jeff

On Monday, March 24, 2014 8:53:23 AM UTC-6, Patrik Nordwall wrote:

 The big difference is that when using akka-persistence the messages are 
 not stored immediately when placed in the mailbox. When you receive the 
 Persistent message in the Processor you know that it has been stored. The 
 sender can let go of the responsibility when it has received an explicit 
 acknowledgement message from the Processor.

 A safe hand-off like that can also be implemented with a 
 PersistentChannel. This is rather close to a durable mailbox, but so much 
 better.

 class Endpoint extends Actor {
   val channel = context.actorOf(PersistentChannel.props(
 PersistentChannelSettings(redeliverInterval = 3.seconds, redeliverMax 
 = 10,
   replyPersistent = true)), name = myChannel)
   val destination = context.system / jobManager

   import context.dispatcher
   implicit val timeout = Timeout(5.seconds)
  

   def receive = {
 case job: Job =
   (channel ? Deliver(Persistent(job), destination)) map {
 case _: Persistent = OK:  + job.id
   } recover {
 case e = FAILED:  + job.id
   } pipeTo sender()
   }
 }

 Cheers,
 Patrik



 On Mon, Mar 24, 2014 at 3:22 PM, Akka Team akka.o...@gmail.com 
 javascript: wrote:

 Hi Logan,


  I took a look into the akka-persistence documentation and from what I 
 can gather the goal of the project is to store processed messages so that 
 an actor's state can be rebuilt upon an actor's restart, while I'd like to 
 store unprocessed messages so they can be processed upon an actor's restart 
 (for my purposes, my actor is essentially stateless).


 Processors (
 http://doc.akka.io/docs/akka/2.3.0/scala/persistence.html#Processors) 
 store the message *first* and then process them. If you do not want to keep 
 the whole history of the Processor in the journal you can delete messages: 
 http://doc.akka.io/docs/akka/2.3.0/scala/persistence.html#Message_deletion

 -Endre
  


 Am I understanding this incorrectly? Is there a way to accomplish my 
 goal with the persistence module, or is there an alternative approach?

 Thanks!

 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: 
 https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send 
 an email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Akka Team
 Typesafe - The software stack for applications that scale
 Blog: letitcrash.com
 Twitter: @akkateam
  
 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 

 Patrik Nordwall
 Typesafe http://typesafe.com/ -  Reactive apps on the JVM
 Twitter: @patriknw

  

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Daemonic Akka creating non-daemon threads

2014-01-15 Thread Jeff Crilly
Thx for the feedback!

Re ouliving... Afaict, the app just stops getting connections, so its 
hard to tell there is a problem from the inside.  (Fwiw, this app also 
connects to other systems which might be down; hence the app needs to deal 
with failure and recover those connections -- simply exiting the app is not 
an option in those scenarios.)

As for periodically... good question.. one thing i'm asking for is the 
rate of leak.   It is not on the order of minutes, more like days, 
and may actually be weeks (I know this sounds odd, but i dont have direct 
access to the production systems and need to go through a couple layers of 
IT folks in different geos and timezones.)

Anyhow, I just wanted to check on the options/alternatives that might not 
be obvious.  I'm a fan of getting on the latest stable (but not bleeding 
edge).

-jeff
 


On Wednesday, January 15, 2014 2:25:09 AM UTC-8, Björn Antonsson wrote:

 Hi Jeff,

 On Tuesday, 14 January 2014 at 03:54, Jeff Crilly wrote:

 We've had some recent production issues and it looks like this is the 
 exact issue biting us; (this results in a need to restart our appservers 
 rather periodically.)
 The app is on akka 2.1.4.


 I’m just curious as to what you mean by periodically. Also, why does you 
 app server outlive the actor systems created inside it? You shouldn’t need 
 to start/stop actor systems frequently.

 I scanned the akka 2.1.x to 2.2.x migration guide a while ago (and also 
 just now), and it didn't seem like a drop in library change.   (Am I 
 missing something?)


 You are right that there are a number of changes between 2.1 and 2.2 that 
 might make upgrading (depending on you application) include some rewriting 
 of you application. The easiest thing is probably to try to build against 
 2.2.x.

 Any other thoughts on how we might mitigate the issue alternatively w/o 
 2.2 upgrade?   (Thought i'd ask anyhow.)

 (Or do we.. ahem.. need to bite the bullet and do the upgrade?  This is an 
 application that is undergoing minor bug fixes, but not major changes, 
 hence rolling a full regression etc is not quite in the plan; we were 
 looking for a narrow fix/solution rather than (afaict) refactoring the akka 
 usage.) 


 I think that upgrading to 2.2 is the only available solution for the 
 thread leakage right now, unless you keep the same actor system around in 
 the app server.

 B/

 thx!

 On Sunday, June 23, 2013 9:55:35 AM UTC-7, √ wrote:

 I just want to clarify that there is no ship date for a 2.1.5 release at 
 this date.

 Cheers,
 √


 On Sun, Jun 23, 2013 at 12:44 PM, Matei Zaharia matei@gmail.comwrote:

 Awesome, thanks!

 Matei

 On Jun 23, 2013, at 12:39 PM, √iktor Ҡlang viktor...@gmail.com wrote:

 https://github.com/akka/akka/pull/1559/files


 On Sat, Jun 22, 2013 at 7:16 PM, Matei Zaharia matei@gmail.comwrote:

 Yes, it was in 2.1.4, and it looks like it's been fixed in 
 https://www.assembla.com/spaces/akka/tickets/3436. I'm curious, will the 
 fix be merged into 2.1.5, or should I upgrade to 2.2?

 Matei


 On Wednesday, June 12, 2013 8:07:53 AM UTC-4, Patrik Nordwall wrote:

 Thanks for reporting. We will look into it.
 Is it akka 2.1.4? Otherwise, please try with that version.
 /Patrik


 On Sun, Jun 9, 2013 at 1:07 AM, Matei Zaharia matei@gmail.com wrote:

 Hi,

 I've noticed that in Akka 2.1.x, setting the daemonic flag to true no 
 longer ensures that all of Akka's threads are daemons. Instead, if you 
 register any remote actors, there are some leftover threads like this:

 New I/O server boss #18 prio=10 tid=0x7f83949eb000 nid=0x5fb 
 runnable [0x7f836d0a8000]
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:81)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 - locked 0xfeae8170 (a sun.nio.ch.Util$2)
 - locked 0xfeae8180 (a java.util.Collections$
 UnmodifiableSet)
 - locked 0xfeae8128 (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:102)
 at org.jboss.netty.channel.socket.nio.NioServerBoss.
 select(NioServerBoss.java:163)
 at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
 AbstractNioSelector.java:206)
 at org.jboss.netty.channel.socket.nio.NioServerBoss.run(
 NioServerBoss.java:42)
 at org.jboss.netty.util.ThreadRenamingRunnable.run(
 ThreadRenamingRunnable.java:108)
 at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(
 DeadLockProofWorker.java:42)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1110)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)

 New I/O worker #17 prio=10 tid

Re: [akka-user] Daemonic Akka creating non-daemon threads

2014-01-13 Thread Jeff Crilly
We've had some recent production issues and it looks like this is the 
exact issue biting us; (this results in a need to restart our appservers 
rather periodically.)
The app is on akka 2.1.4.

I scanned the akka 2.1.x to 2.2.x migration guide a while ago (and also 
just now), and it didn't seem like a drop in library change.   (Am I 
missing something?)

Any other thoughts on how we might mitigate the issue alternatively w/o 2.2 
upgrade?   (Thought i'd ask anyhow.)

(Or do we.. ahem.. need to bite the bullet and do the upgrade?  This is an 
application that is undergoing minor bug fixes, but not major changes, 
hence rolling a full regression etc is not quite in the plan; we were 
looking for a narrow fix/solution rather than (afaict) refactoring the akka 
usage.) 

thx!

On Sunday, June 23, 2013 9:55:35 AM UTC-7, √ wrote:

 I just want to clarify that there is no ship date for a 2.1.5 release at 
 this date.

 Cheers,
 √


 On Sun, Jun 23, 2013 at 12:44 PM, Matei Zaharia 
 matei@gmail.comjavascript:
  wrote:

 Awesome, thanks!

 Matei

 On Jun 23, 2013, at 12:39 PM, √iktor Ҡlang 
 viktor...@gmail.comjavascript: 
 wrote:

 https://github.com/akka/akka/pull/1559/files


 On Sat, Jun 22, 2013 at 7:16 PM, Matei Zaharia 
 matei@gmail.comjavascript:
  wrote:

 Yes, it was in 2.1.4, and it looks like it's been fixed in 
 https://www.assembla.com/spaces/akka/tickets/3436. I'm curious, will 
 the fix be merged into 2.1.5, or should I upgrade to 2.2?

 Matei


 On Wednesday, June 12, 2013 8:07:53 AM UTC-4, Patrik Nordwall wrote:

 Thanks for reporting. We will look into it.
 Is it akka 2.1.4? Otherwise, please try with that version.
 /Patrik


 On Sun, Jun 9, 2013 at 1:07 AM, Matei Zaharia matei@gmail.comwrote:

 Hi,

 I've noticed that in Akka 2.1.x, setting the daemonic flag to true 
 no longer ensures that all of Akka's threads are daemons. Instead, if you 
 register any remote actors, there are some leftover threads like this:

 New I/O server boss #18 prio=10 tid=0x7f83949eb000 nid=0x5fb 
 runnable [0x7f836d0a8000]
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
 java:81)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 - locked 0xfeae8170 (a sun.nio.ch.Util$2)
 - locked 0xfeae8180 (a java.util.Collections$
 UnmodifiableSet)
 - locked 0xfeae8128 (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:102)
 at org.jboss.netty.channel.socket.nio.NioServerBoss.
 select(NioServerBoss.java:163)
 at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
 AbstractNioSelector.java:206)
 at org.jboss.netty.channel.socket.nio.NioServerBoss.run(
 NioServerBoss.java:42)
 at org.jboss.netty.util.ThreadRenamingRunnable.run(
 ThreadRenamingRunnable.java:108)
 at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(
 DeadLockProofWorker.java:42)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1110)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)

 New I/O worker #17 prio=10 tid=0x7f83949d9000 nid=0x5fa runnable 
 [0x7f836d1a9000]
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
 java:81)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 - locked 0xfeb200e8 (a sun.nio.ch.Util$2)
 - locked 0xfeb200f8 (a java.util.Collections$
 UnmodifiableSet)
 - locked 0xfeb200a0 (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at org.jboss.netty.channel.socket.nio.SelectorUtil.
 select(SelectorUtil.java:64)
 at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(
 AbstractNioSelector.java:409)
 at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
 AbstractNioSelector.java:206)
 at org.jboss.netty.channel.socket.nio.AbstractNioWorker.
 run(AbstractNioWorker.java:90)
 at org.jboss.netty.channel.socket.nio.NioWorker.run(
 NioWorker.java:178)
 at org.jboss.netty.util.ThreadRenamingRunnable.run(
 ThreadRenamingRunnable.java:108)
 at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(
 DeadLockProofWorker.java:42)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1110)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)


 Calling ActorSystem.shutdown() does shut these down but, I'd