Re: [akka-user] Akka stream out of band data

2018-02-17 Thread Christopher Hunt


> On 18 Feb 2018, at 11:53, Christopher Hunt  wrote:
> 
> By way of solution, perhaps this could be achieved similarly to the 
> materialiser api calls eg viaMat/viaOob.

It just occurred to me that compound types might be the way to solve this eg 
`new Element with OOB[Span]`. I shall experiment further... and then PR the 
Akka stream doco with a section on OOB/instrumentation if there’s interest. I’m 
very interested to hear about what others think though. 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka stream out of band data

2018-02-17 Thread Christopher Hunt
Hi there,

I’ve been wondering recently if there’s been any thought around carrying out of 
band data through an Akka Stream in the spirit of network streams: 
https://en.m.wikipedia.org/wiki/Out-of-band_data 

One use case is for carrying Open Tracing Spans with elements without polluting 
stream stage operations. To illustrate: 
https://github.com/akka/alpakka/issues/463#issuecomment-365765409

By way of solution, perhaps this could be achieved similarly to the 
materialiser api calls eg viaMat/viaOob.

Thoughts?

Cheers 
C

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Question on Akka router

2018-02-17 Thread Patrik Nordwall
The drawback of first sending everything to one single node would be
“single point of bottleneck” and an extra network hop for many messages.

Even though all frontend nodes might not have exactly the same load
information at the same time that should be fine anyway.

Load based routing is always difficult since it is acting on “historical”
information and it’s easy to get oscillations. This also means that the
distributed nature of the information is of less importance. It’s only an
approximation that is changing anyway.

/Patrik
lör 17 feb. 2018 kl. 06:37 skrev Grace :

> Greetings!
>
> I am new to Akka and am trying to understand routing.
>
> In the below example, I have one front end and three backends running.
> Suppose I run two front ends, my understand is that each front end would
> have an instance of a router, is that correct?  In such case, wouldn't the
> balancing logic be not quite correct because each router is not coordinated
> with another?  Is it common practice to create a centralized router (one
> single router) that all clients (frontEnds) go to? What are the pros and
> cons of the single router design and do you know of an example of article
> of such?
>
> Or is this below design already sufficient for proper balanced routing?
>
> Thanks,
> Grace
>
> package com.packt.akka.loadBalancing
> import com.packt.akka.commons.Add
>
> object LoadBalancingApp extends App {
>
>
>   //initiate three nodes from backend
>   Backend.initiate(2551)
>
>
>   Backend.initiate(2552)
>
>
>   Backend.initiate(2561)
>
>
>   //initiate frontend node
>   Frontend.initiate()
>
>
>   Thread.sleep(1)
>
>
>   Frontend.getFrontend ! Add(2, 4)
>
>
> }
>
>
>
>
> Loadbalancer.conf
>
>
> akka {
>   actor {
> provider = "akka.cluster.ClusterActorRefProvider"
>   }
>   remote {
> log-remote-lifecycle-events = off
> netty.tcp {
>   hostname = "127.0.0.1"
>   port = 0
> }
>   }
>
>
>   cluster {
> seed-nodes = [
>   "akka.tcp://ClusterSystem@127.0.0.1:2551",
>   "akka.tcp://ClusterSystem@127.0.0.1:2552"]
>
>
> auto-down-unreachable-after = 10s
>   }
> }
>
>
> akka.cluster.min-nr-of-members = 3
>
>
>
>
> akka.cluster.role {
>   frontend.min-nr-of-members = 1
>   backend.min-nr-of-members = 2
> }
>
>
> akka.actor.deployment {
>   /frontend/backendRouter {
> # Router type provided by metrics extension.
> router = adaptive-group
> # Router parameter specific for metrics extension.
> # metrics-selector = heap
> # metrics-selector = load
> # metrics-selector = cpu
> metrics-selector = mix
> #
> nr-of-instances = 100
> routees.paths = ["/user/backend"]
> cluster {
>   enabled = on
>   use-role = backend
>   allow-local-routees = off
> }
>   }
> }
>
>
>
>
>
> Backend.scala
>
>
>
>
>
> class Backend extends Actor {
>
>
>   def receive = {
> case Add(num1, num2) =>
>   println(s"I'm a backend with path: ${self} and I received add
> operation.")
>
>
>   }
>
>
> }
>
>
> object Backend {
>   def initiate(port: Int){
>  val config = ConfigFactory.parseString(s
> "akka.remote.netty.tcp.port=$port").
>   withFallback(ConfigFactory.parseString("akka.cluster.roles =
> [backend]")).
>   withFallback(ConfigFactory.load("loadbalancer"))
>
>
> val system = ActorSystem("ClusterSystem", config)
>
>
> val Backend = system.actorOf(Props[Backend], name = "backend")
>   }
> }
>
>
> Fronend.scala
>
>
> class Frontend extends Actor {
>   import context.dispatcher
>
>
>   val backend = context.actorOf(FromConfig.props(), name = "backendRouter"
> )
>
>
>   context.system.scheduler.schedule(3.seconds, 3.seconds, self,
> Add(Random.nextInt(100), Random.nextInt(100)))
>
>
>   def receive = {
> case addOp: Add =>
>   println("Frontend: I'll forward add operation to backend node to
> handle it.")
>   backend forward addOp
>
>
>   }
>
> }
>
>
>
> object Frontend {
>
>
>   private var _frontend: ActorRef = _
>
>
>   val upToN = 200
>
>
>   def initiate() = {
> val config = ConfigFactory.parseString("akka.cluster.roles =
> [frontend]").
>   withFallback(ConfigFactory.load("loadbalancer"))
>
>
> val system = ActorSystem("ClusterSystem", config)
> system.log.info("Frontend will start when 2 backend members in the
> cluster.")
> //#registerOnUp
> Cluster(system) registerOnMemberUp {
>   _frontend = system.actorOf(Props[Frontend],
> name = "frontend")
> }
> //#registerOnUp
>
>
>   }
>
>
>   def getFrontend = _frontend
> }
>
>
>
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to 

Re: [akka-user] Getting a timeout error when testing a stream with mapAsync stage

2018-02-17 Thread Jakub Kahovec
I see. Thank you Patrik for the explanation.

Jakub

On Friday, February 16, 2018 at 9:55:45 PM UTC+1, Patrik Nordwall wrote:
>
> It is allowed to ”delay” delivery of the complete signal when there is no 
> demand. This behavior is undefined, i.e. some stages deliver it 
> immediately, some only when demand is requested. This is as expected and in 
> such test you have to request enough to be sure to get the completed signal.
>
> /Patrik
> fre 16 feb. 2018 kl. 09:02 skrev Jakub Kahovec  >:
>
>> Hi,
>>
>>
>> when using Akka Streams (2.5.9)  I've recently bumped into a problem when 
>> testing a stream with a mapAsync stage. It unexpectedly ends with an 
>> assertion failing on *"timeout (3 seconds) during expectMsg while 
>> waiting for OnComplete"*
>>
>>
>> To demonstrate it I've created a simple example. 
>>
>>
>> The code below works as expected.
>>
>>
>> val sourceUnderTest = Source(1 to 2).mapAsync(2)(i => Future.successful(i 
>> * 2))
>>
>> val c = TestSubscriber.manualProbe[Int]() 
>> val p = sourceUnderTest.to(Sink.fromSubscriber(c)).run() 
>> val sub = c.expectSubscription() 
>>
>> sub.request(2) 
>> c.expectNextN(2)   // List(2,4) 
>> c.expectComplete()   // 
>> akka.stream.testkit.TestSubscriber$ManualProbe@fc258b1
>>
>>
>> However, when I add another mapping function (bold code), the test fails 
>> with a timeout. When I call additional c.request(1) (commented code) it 
>> ends correctly. So it looks like the mapping function adds an additional 
>> item into the stream, which seems strange.
>>
>>   
>> val sourceUnderTest = Source(1 to 2).mapAsync(2)(i => Future.successful(i 
>> * 2)*.**map(identity**)*)
>>
>> val c = TestSubscriber.manualProbe[Int]() 
>> val p = sourceUnderTest.to(Sink.fromSubscriber(c)).run() 
>> val sub = c.expectSubscription() 
>>
>> sub.request(2) 
>> c.expectNextN(2)   // List(2,4) 
>> // c.request(1)
>> c.expectComplete()   // ends with java.lang.AssertionError: assertion 
>> failed: timeout (3 seconds) during expectMsg while waiting for OnComplete
>>
>>
>> Can anyone explain this strange behaviour ?
>>
>>
>> Thanks
>>
>> Jakub
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.