Re: [akka-user] [akka-stream 0.9] Create Flow[A,B] from Publisher[A] and Subscriber[B]

2014-12-27 Thread Piotr Buszka
Hi,

I wanted as well to use ActorPublisher/ActorSubcriber pair. My use case was 
related to dynamically scaled number of wokers on a transforming task with 
a possible worker failure, restart and redo (based on Work Pulling pattern 
by Derek Wyatt
 on 
http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2
 
)

I wanted to use streams for all other processing but this particular stage 
needs to be dynamically scaled so I cannot use cookbook pattern Balancing 
jobs to a fixed pool of workers and I am not yet sure if i can use actor 
ask pattern from Globally limiting the rate of a set of streams example 
to achieve desired result.

Independently of my particular problem I think the question could be 
restated what is the best practice in interfacing a flow transformer in 
akka streams with other parts of system which use actors. To be more exact 
is it possible for a Flow transformer (Sink and Source works) to process 
actor messages from external world which goes beyond actor ask pattern from 
Limiter example and be able to receive and process external actor messages 
in a thread safe way. I think it implies that the underlying actor must be 
the same instance for stream management and custom processing as it is for 
an ActorSubcriber or ActorPublisher.

I imagine the api would extend the existing PropsSource and PropsSink  and 
add a PropsFlow

trait ActorFlow[In, Out] extends ActorPublisher[Out] with ActorSubscriber { 
.. }

class MyActorFlow extends ActorFlow[In, Out] { ... }

val props = Props(classOf[MyActorFlow])
val flowActor = PropsFlow[In, Out](props)

val source = ...
val sink = 

val flowGraph = FlowGraph { implicit builder =
source ~ flowActor ~ sink
  }

I think all pieces are already available with ActorPublisher and 
ActorSubscriber infrastructure and the missing part is some API code to 
support ActorFlow in my example.

Best regards,

Piotr Buszka


On Friday, 26 December 2014 02:51:19 UTC+1, Konrad Malawski wrote:

 Hi Mark,
 would you mind explaining what exact use case you’re trying to cover?
 Instead of dropping down to reactive streams interfaces (publisher / 
 subscriber) which are *very low-level*,
 instead you could perhaps stick with our much simpler to use custom Stages.
 Here’s the docs on implementing custom stages: 
 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-customize.html

 And there are also plenty examples in the stream cookbook: 
 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html
 which could be useful when implementing your custom processing stages.

 Hope this helps, happy hakking!

 -- 
 Konrad 'ktoso' Malawski
 hAkker @ typesafe
 http://akka.io

 On 25 December 2014 at 14:13:32, Mark van Buskirk (markvan...@gmail.com 
 javascript:) wrote:
 Id love to see an example of two actors that are both publisher and 
 subscriber connected. I've been trying to get that going and have been 
 having a bunch of trouble getting it working. 

 -- 
  Read the docs: http://akka.io/docs/ 
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html 
  Search the archives: https://groups.google.com/group/akka-user 
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group. 
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:. 
 To post to this group, send email to akka...@googlegroups.com 
 javascript:. 
 Visit this group at http://groups.google.com/group/akka-user. 
 For more options, visit https://groups.google.com/d/optout. 


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Recording latency of operations that return a Future inside an akka stream

2014-12-27 Thread Konrad 'ktoso' Malawski
Hi Soumya,
I don’t think what you’ll end up measuring this way will be very useful. I 
mean, between the completion of the future and the triggering of the map there 
are multiple asynchronous boundaries… So you won’t be measuring how fast the 
set operation was, but how much time was between these asynchronous boundaries 
- which could have been backpressured by the way.

I suggest directly wrapping the set call with your measurement logic instead - 
since that is what you want to measure it seems.

By the way, we do have a “timed” element, in our extras section: 
https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/extra/Timed.scala
 You can `import Timed._` and then use it as shown here: 
https://github.com/akka/akka/blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala
It’s a rather old element and I’m not sure if we’ll be keeping it, but you can 
use it as a source of inspiration in case you end up needing that kind of 
measurement.



On 26 December 2014 at 05:46:55, Soumya Simanta (soumya.sima...@gmail.com) 
wrote:


This is related to this thread but sufficiently different that I decided to 
create new thread. Hope that's okay. 

I would like to create a histogram of latency of a large number of set 
operations ( set returns a Future[Boolean]) using LatencyUtils 

 Basically I need to start recording the time before the set operation (inside 
mapAsyncUnordered(k = redis.set(k + rnd, message))) and then somehow record 
the end time in a map operation( .map( //record the end time here) after this. 
I'm having a hard time trying to figure this out. 

My understanding is that the even though the mapAsyncUnordered doesn't maintain 
the order of operations the map following the mapAsynUnordered will maintain 
the order from the previous stage because of TCP maintaining the order. Is this 
correct? 


val redis = RedisClient(localhost)

val random1 = UUID.randomUUID().toString

def insertValues(rnd: String): Flow[Int, Boolean] = {
    Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map( //record 
the end time here)
}

val blackhole = BlackholeSink

val maxSeq = 500
val seqSource = Source( () = (1 to maxSeq).iterator )
val streamWithSeqSource = 
seqSource.via(insertValues(random1)).runWith(blackhole)


--
 Read the docs: http://akka.io/docs/
 Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
 Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream 0.9] Create Flow[A,B] from Publisher[A] and Subscriber[B]

2014-12-27 Thread Mark van Buskirk
I think I'm clinging to trying to use actors as both publisher and
subscriber because I like actor supervision and I don't see how you do
supervision with streams. Does a custom
Stage / transformer have supervision where exceptions/errors can cause a
restart/resume/escalate that affects a particular steam element but let's
the following elements continue on?

What am I missing? I feel like there is something big here that I'm just
not seeing. I want backpressure, supervision and the ability to make
cohesive little modules of code, I.e. micro services. Then I want to pass
commands and domain objects between these microservices. I want back
pressure, and supervision at all levels. Am I misguided in this design? Am
I missing something obvious?

On Sat, Dec 27, 2014, 05:12 Piotr Buszka pi...@buszka.eu wrote:

 Hi,

 I wanted as well to use ActorPublisher/ActorSubcriber pair. My use case
 was related to dynamically scaled number of wokers on a transforming task
 with a possible worker failure, restart and redo (based on Work Pulling
 pattern by Derek Wyatt
  on
 http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2
 )

 I wanted to use streams for all other processing but this particular stage
 needs to be dynamically scaled so I cannot use cookbook pattern Balancing
 jobs to a fixed pool of workers and I am not yet sure if i can use actor
 ask pattern from Globally limiting the rate of a set of streams example
 to achieve desired result.

 Independently of my particular problem I think the question could be
 restated what is the best practice in interfacing a flow transformer in
 akka streams with other parts of system which use actors. To be more exact
 is it possible for a Flow transformer (Sink and Source works) to process
 actor messages from external world which goes beyond actor ask pattern from
 Limiter example and be able to receive and process external actor messages
 in a thread safe way. I think it implies that the underlying actor must be
 the same instance for stream management and custom processing as it is for
 an ActorSubcriber or ActorPublisher.

 I imagine the api would extend the existing PropsSource and PropsSink  and
 add a PropsFlow

 trait ActorFlow[In, Out] extends ActorPublisher[Out] with ActorSubscriber
 { .. }

 class MyActorFlow extends ActorFlow[In, Out] { ... }

 val props = Props(classOf[MyActorFlow])
 val flowActor = PropsFlow[In, Out](props)

 val source = ...
 val sink = 

 val flowGraph = FlowGraph { implicit builder =
 source ~ flowActor ~ sink
   }

 I think all pieces are already available with ActorPublisher and
 ActorSubscriber infrastructure and the missing part is some API code to
 support ActorFlow in my example.

 Best regards,

 Piotr Buszka


 On Friday, 26 December 2014 02:51:19 UTC+1, Konrad Malawski wrote:

 Hi Mark,
 would you mind explaining what exact use case you’re trying to cover?
 Instead of dropping down to reactive streams interfaces (publisher /
 subscriber) which are *very low-level*,
 instead you could perhaps stick with our much simpler to use custom
 Stages.
 Here’s the docs on implementing custom stages: http://doc.akka.io/
 docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-customize.html

 And there are also plenty examples in the stream cookbook:
 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/
 stream-cookbook.html
 which could be useful when implementing your custom processing stages.

 Hope this helps, happy hakking!

 --
 Konrad 'ktoso' Malawski
 hAkker @ typesafe
 http://akka.io

 On 25 December 2014 at 14:13:32, Mark van Buskirk (markvan...@gmail.com)
 wrote:

 Id love to see an example of two actors that are both publisher and
 subscriber connected. I've been trying to get that going and have been
 having a bunch of trouble getting it working.

 --
  Read the docs: http://akka.io/docs/
  Check the FAQ: http://doc.akka.io/docs/akka/
 current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google Groups
 Akka User List group.

 To unsubscribe from this group and stop receiving emails from it, send an
 email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.

 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.

Re: [akka-user] [akka-stream 0.9] Create Flow[A,B] from Publisher[A] and Subscriber[B]

2014-12-27 Thread Konrad Malawski
Hi Mark,
You're completely right a supervision of  streams has been on our minds for
a long time now. Sadly you bumped into an yet not implemented area of Akka
streams, here's the issue we're talking about here -
https://github.com/akka/akka/issues/15710

We had multiple talks on how to apply supervision to streams and think
we've prepared the infrastructure for it yet this exact feature remains to
be implemented before we ship a full 1.0 (please remember the current
release is still a preview.).

Thanks for the feedback and happy hakking!

-- 
Konrad 'ktoso' Malawski
(sent from my mobile)
On 27 Dec 2014 15:04, Mark van Buskirk markvanbusk...@gmail.com wrote:

 I think I'm clinging to trying to use actors as both publisher and
 subscriber because I like actor supervision and I don't see how you do
 supervision with streams. Does a custom
 Stage / transformer have supervision where exceptions/errors can cause a
 restart/resume/escalate that affects a particular steam element but let's
 the following elements continue on?

 What am I missing? I feel like there is something big here that I'm just
 not seeing. I want backpressure, supervision and the ability to make
 cohesive little modules of code, I.e. micro services. Then I want to pass
 commands and domain objects between these microservices. I want back
 pressure, and supervision at all levels. Am I misguided in this design? Am
 I missing something obvious?

 On Sat, Dec 27, 2014, 05:12 Piotr Buszka pi...@buszka.eu wrote:

 Hi,

 I wanted as well to use ActorPublisher/ActorSubcriber pair. My use case
 was related to dynamically scaled number of wokers on a transforming task
 with a possible worker failure, restart and redo (based on Work Pulling
 pattern by Derek Wyatt
  on
 http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2
 )

 I wanted to use streams for all other processing but this particular
 stage needs to be dynamically scaled so I cannot use cookbook pattern
 Balancing jobs to a fixed pool of workers and I am not yet sure if i can
 use actor ask pattern from Globally limiting the rate of a set of streams
 example to achieve desired result.

 Independently of my particular problem I think the question could be
 restated what is the best practice in interfacing a flow transformer in
 akka streams with other parts of system which use actors. To be more exact
 is it possible for a Flow transformer (Sink and Source works) to process
 actor messages from external world which goes beyond actor ask pattern from
 Limiter example and be able to receive and process external actor messages
 in a thread safe way. I think it implies that the underlying actor must be
 the same instance for stream management and custom processing as it is for
 an ActorSubcriber or ActorPublisher.

 I imagine the api would extend the existing PropsSource and PropsSink
  and add a PropsFlow

 trait ActorFlow[In, Out] extends ActorPublisher[Out] with ActorSubscriber
 { .. }

 class MyActorFlow extends ActorFlow[In, Out] { ... }

 val props = Props(classOf[MyActorFlow])
 val flowActor = PropsFlow[In, Out](props)

 val source = ...
 val sink = 

 val flowGraph = FlowGraph { implicit builder =
 source ~ flowActor ~ sink
   }

 I think all pieces are already available with ActorPublisher and
 ActorSubscriber infrastructure and the missing part is some API code to
 support ActorFlow in my example.

 Best regards,

 Piotr Buszka


 On Friday, 26 December 2014 02:51:19 UTC+1, Konrad Malawski wrote:

 Hi Mark,
 would you mind explaining what exact use case you’re trying to cover?
 Instead of dropping down to reactive streams interfaces (publisher /
 subscriber) which are *very low-level*,
 instead you could perhaps stick with our much simpler to use custom
 Stages.
 Here’s the docs on implementing custom stages: http://doc.akka.io/
 docs/akka-stream-and-http-experimental/1.0-M2/scala/
 stream-customize.html

 And there are also plenty examples in the stream cookbook:
 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/
 stream-cookbook.html
 which could be useful when implementing your custom processing stages.

 Hope this helps, happy hakking!

 --
 Konrad 'ktoso' Malawski
 hAkker @ typesafe
 http://akka.io

 On 25 December 2014 at 14:13:32, Mark van Buskirk (markvan...@gmail.com)
 wrote:

 Id love to see an example of two actors that are both publisher and
 subscriber connected. I've been trying to get that going and have been
 having a bunch of trouble getting it working.

 --
  Read the docs: http://akka.io/docs/
  Check the FAQ: http://doc.akka.io/docs/akka/
 current/additional/faq.html
  Search the archives: https://groups.google.com/
 group/akka-user
 ---
 You received this message because you are subscribed to the Google
 Groups Akka User List group.

 To unsubscribe from this group and stop receiving emails from it, send an
 email to akka-user+...@googlegroups.com.
 To post to this group, send email to