Re: [akka-user][deprecated] Akka Stream : MergeHub and BroadcastHub

2020-04-13 Thread Alexey Shuksto
> To be honest I'm not sure of how messages are handled when there are no 
active Source[T] instances consuming messages sent to Broadcast.sink[T]...
> My impression is that they would be thrown away, and the bufferSize 
parameter to BroadcastHub.sink[T] only comes into play when there are one 
or more active graphs from (3) consuming messages.

BroadcastHub is blocked until _all_ of the connected to it's materialized 
Source sinks signaled a demand.

Thus, if you want hub to discard all elements until there is some 
meaningful consumer, you need to attach draining no-op sink to it right 
after materialization of BroadcastHub.sink[T].

пятница, 10 апреля 2020 г., 21:11:40 UTC+3 пользователь Brian Maso написал:
>
> I suggest you post your question on the gitter channel (
> gitter.im/akka/akka). There are a lot of knowledgeable people who can 
> answer, and I think it is a much more active space than this (deprecated) 
> list.
>
> But to answer your question: the materialized value of an asynchronous 
> Sink is a Future[T]. The materialized value of a BroadcastHub.sink[T] is a 
> Source[T]. Not every materialized value is a Future[_].
>
> So you have basically three parts:
> 1) Your original runnable graph into which the BroadcastHub.sink[T] is 
> embedded
> 2) A single Source[T] materialized when (1) is run -- this is a 
> re-usable "blueprint" which can be used to define multiple new runnable 
> graphs.
> 3) 0 or more runnable graphs that receive messages through (2)
>
> Each message sent the BroadcastHub.sink[T] during the course of (1)'s run 
> will be queued up and delivered (ie "broadcast") to all of the runnable 
> graphs in (3) when they are run. You can re-use the Source[T] from (2) 
> multiple times, effectively allowing you to dynamically "tap" the flow of 
> messages being sent to the Broadcast.sink[T] from (1).
>
> (To be honest I'm not sure of how messages are handled when there are no 
> active Source[T] instances consuming messages sent to Broadcast.sink[T]... 
> My impression is that they would be thrown away, and the bufferSize 
> parameter to BroadcastHub.sink[T] only comes into play when there are one 
> or more active graphs from (3) consuming messages. Experimentation is 
> probably necessary to confirm that.) 
>
> Best regards,
> Brian Maso
>
> On Fri, Apr 10, 2020 at 10:20 AM Christophe De Troyer <
> christoph...@gmail.com > wrote:
>
>> Hi all, 
>>
>> I've been looking at MergeHub and BroadcastHub for Akka Stream and I am a 
>> bit confused. 
>>
>> In the beginning of the documentation the following is mentioned:
>>
>> It is important to remember that even after constructing the 
>>> RunnableGraph by connecting all the source, sink and different 
>>> operators, no data will flow through it until it is materialized. 
>>> Materialization is the process of allocating all resources needed to run 
>>> the computation described by a Graph (in Akka Streams this will often 
>>> involve starting up Actors). 
>>> ... 
>>> After running (materializing) the RunnableGraph[T] we get back the 
>>> materialized value of type T.
>>>
>>  
>> This makes perfect sense. But I'm having issues uniting this with the 
>> code sample from the MergeHub documentation.
>>
>> // A simple producer that publishes a new "message" every second
>> val producer : Source[String, Cancellable]= Source.tick(1.second, 1.second, 
>> "New message")
>>
>> // Attach a BroadcastHub Sink to the producer. This will materialize to a
>> // corresponding Source.
>> // (We need to use toMat and Keep.right since by default the materialized
>> // value to the left is used)
>> val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
>>   producer.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
>>
>> // By running/materializing the producer, we get back a Source, which
>> // gives us access to the elements published by the producer.
>> val fromProducer: Source[String, NotUsed] = runnableGraph.run()
>>
>> // Print out messages from the producer in two independent consumers
>> fromProducer.runForeach(msg => println("consumer1: " + msg))
>> fromProducer.runForeach(msg => println("consumer2: " + msg))
>>
>>
>> In the above snippet a Source is created, and that is used as the Source 
>> for a runnableGraph (meaning it has a source and a sink). Conceptually I 
>> understand that the BroadcastHub is indeed a sink. But what I do not 
>> understand is that when you run/materialize that RunnableGraph, you get 
>> back a Source.
>>
>> The way I see it, running a graph should return a future of the types of 
>> values flowing through that graph. In this case Strings.
>>
>> Can somebody shed some light on this, please? 
>>
>>  Thanks,
>> Christophe
>>
>> -- 
>>
>> *
>> ** New discussion forum: https://discuss.akka.io/ replacing akka-user 
>> google-group soon.
>> ** This group will soon be put into read-only mode, and replaced by 
>> 

[akka-user] Ask not working if Future is used in receive partial method

2017-10-13 Thread Alexey Shuksto
Hi there,

It's prohibited to leak and close over actor internals. In your case Future not 
only closes over 'sender()', which is very bad and causes timeout (cause by the 
time future is complete sender() is null), you also modify actor internal state 
inside it, assigning to 'student', which is just horrible. 

In your example there us no need for Future at all. If you try to model some 
prolonged action, you should fix your sender by 'val reply = sender()' before 
Future invocation and send results after it back to sender and self, to update 
your student. 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] How to guarantee parallel execution of Sunflower?

2017-08-13 Thread Alexey Shuksto
Thanks, Patrick!

One more question regarding'.async' -- is there any point in adding it to 
Merge/BroadcastHub stages or their materialized sinks/sources?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] How to provide flow with 'last seen element' backup?

2017-08-07 Thread Alexey Shuksto
Hi, Johan,

Thanks for your reply, but would not `Sink.last` complete it's Future only 
after Source fully completed? I need to check for last passed value 
in-between:

```
val bh = Source.unfold(0)(v => Some((v+1, 
v))).delay(1.minute).runWith(BroadcastHub.sink)
bh.runWith(Sink.ignore)
```

So, if `bh` would be materialized after 2.5 minutes, it would not only 
observer value `3` after another 30 seconds, but also value `2` just after 
materialization.

понедельник, 7 августа 2017 г., 15:49:06 UTC+3 пользователь Akka Team 
написал:
>
> You can achieve this by using Sink.last which will materialize into a 
> Future[T] that completes with the last element emitted. 
>
> It does however not complete the promise with the last element on 
> failures, this can be remedied by prepending it with a 
> .recoverWithRetries(0,  { case _ => Source.empty }) making sure any 
> upstream failure leads to a completion of the stream.
>
> --
> Johan
> Akka Team
>
> On Sat, Jul 29, 2017 at 8:17 AM, Alexey Shuksto <sei...@gmail.com 
> > wrote:
>
>> Hello hAkkers,
>>
>> In our project we use some number of flows that follow same building 
>> logic:
>> ```
>> val input: Source[T, NotUsed] = ???
>> val output = input.runWith(BroadcastHub.sink)
>>
>> output.runWith(Sink.ignore)
>> ```
>>
>> Sink.ignore here used to prevent BroadcastHub inner buffer overflow and, 
>> also, to dump out old stream elements as they are usually do not matter.
>>
>> But in some rare cases we need to provide to clients of our source 'the 
>> last seen element' -- the last element which passed our flow before client 
>> materialization of BroadcastHub' Source.
>>
>> Currently it is done via some private volatile var:
>>
>> ```
>> @volatile private var last: Option[T] = None
>> _output.runForeach(e => last = Some(e))
>>
>> val output = _output.prepend(Source.lazily(() => last).collect { case 
>> Some(e} => e})
>> ```
>>
>> I wonder maybe there is better way to do so, without mutable state and 
>> stuff?
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Flow.fromSinkAndSource and backpressure

2017-08-02 Thread Alexey Shuksto
Hi Konrad,

Jeff was asking about 'backpressure propagation'. From my understanding that 
means that when source wasn't pulled, sink would not pull. And that's not how 
it works as you already mentioned. 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Flow.fromSinkAndSource and backpressure

2017-08-02 Thread Alexey Shuksto
Nope. More so, completion of one of them would not cause completion of another 
and flow itself. 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] How to provide flow with 'last seen element' backup?

2017-07-29 Thread Alexey Shuksto
Hello hAkkers,

In our project we use some number of flows that follow same building logic:
```
val input: Source[T, NotUsed] = ???
val output = input.runWith(BroadcastHub.sink)

output.runWith(Sink.ignore)
```

Sink.ignore here used to prevent BroadcastHub inner buffer overflow and, 
also, to dump out old stream elements as they are usually do not matter.

But in some rare cases we need to provide to clients of our source 'the 
last seen element' -- the last element which passed our flow before client 
materialization of BroadcastHub' Source.

Currently it is done via some private volatile var:

```
@volatile private var last: Option[T] = None
_output.runForeach(e => last = Some(e))

val output = _output.prepend(Source.lazily(() => last).collect { case 
Some(e} => e})
```

I wonder maybe there is better way to do so, without mutable state and 
stuff?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Message [akka.stream.impl.fusing.ActorGraphInterpreter$Resume$] was not delivered in 'completeStage()' of BidiStage

2017-06-06 Thread Alexey Shuksto
Hello hAkkers,

I've got some filter-like bidi-stage that filters elements of second flow 
based on elements from first one. Like this:
  final class IOFilter[O] extends GraphStage[BidiShape[O, O, O, O]] {
val inO1: Inlet[O] = Inlet("IOFilter.inO1")
val inO2: Inlet[O] = Inlet("IOFilter.inO2")


val outO1: Outlet[O] = Outlet[O]("IOFilter.outO1")
val outO2: Outlet[O] = Outlet[Response[I]]("IOFilter.outO2")


val shape = BidiShape(inO1, outO1, inO2, outO2)


def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
  new GraphStageLogic(shape) with StageLogging {
private var value: O = _


setHandler(outO1, () => pull(inO1))
setHandler(
  inO1,
  new InHandler {
override def onPush(): Unit = {
value = grab(inO1)
push(outO1, value)
maybePullO2()
}


override def onUpstreamFinish(): Unit = complete(outO1)
  }
)


setHandler(outO2, () => maybePullO2())
setHandler(
  inO2,
  new InHandler {
override def onPush(): Unit = {
  if (value == grab(inO2)) {
emit(outO2, value)


if (isClosed(inO1)) completeStage()
  } else maybePullO2()
}
  }
)


private def maybePullO2(): Unit = if (value != null) pull(inO2)
  }
  }

The problem is that after completion of 'outO2' outlet I receive log 
message about `Message 
[akka.stream.impl.fusing.ActorGraphInterpreter$Resume$] was not delivered:
akka.actor.RepointableActorRef - Message 
[akka.stream.impl.fusing.ActorGraphInterpreter$Resume$] from 
Actor[akka://pricing/user/StreamSupervisor-2/pricing-71-0-unknown-
operation#-1846184194] to 
Actor[akka://pricing/user/StreamSupervisor-2/pricing-71-0-unknown-operation#-1846184194]
 
was not delivered. [3] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.

This message do not appear if I do not complete stage after first flow 
completion but since second flow is potentially infinite it is not safe to 
do so.

Do I need to worry about this message or it is safe just to skip it?

Java 8, Scala 2.12.2, Akka 2.4.18.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] StreamConverters.fromOutputStream(..) stops processing messages

2017-05-16 Thread Alexey Shuksto
Thanks for the tip!

I've found the culpit -- threads were not blocked, they all were waiting 
for read from corresponding input streams, separate dispatcher for intput 
and output streams resolves this problem.

Still, I'm not comfortable with so many thread blocking on read, so I'll 
try to rewrite this with NuProcess library.

вторник, 16 мая 2017 г., 16:44:52 UTC+3 пользователь Martynas Mickevičius 
написал:
>
> It could be that all of the threads are blocked in the 
> default-blocking-io-dispatcher 
> Make a thread dump and take a look if that is the case.
>
> On Mon, May 15, 2017 at 8:17 PM Alexey Shuksto <sei...@gmail.com 
> > wrote:
>
>> Hello hAkkers,
>>
>> Our app spawns multiple external processes (via 
>> java.util.Runtime.exec(..)) and then writes to STDIN of such processes via:
>> ```
>> StreamConverters
>>   .fromOutputStream(() => process.getOutputStream, autoFlush = true)
>>   .runWith(MergeHub.source[])
>> ```
>>
>> MergeHub is attached to output stream to prevent closing of such stream 
>> after one of client done writing to it.
>>
>> While there are up to, say, four external processes everything is working 
>> fine, but as soon as they number increases, OutputStreamSubscriber just 
>> stops to request next elements for all of them.
>> I've checked via adding `.confate` for incoming ByteStrings -- buffer 
>> just keeps growing.
>>
>> It is possible to replace StreamConverters with simple
>> ```
>> Sink
>>   .foreach[ByteString] { 
>> process.getOutputStream.write(bs.toArray)
>> process.getOutputStream.flush()
>>   }
>> ```
>> and everything would work, but it's blocking and stuff.
>>
>> What may be the cause of this? I've tried to change settings of 
>> "default-blocking-io-dispatcher", but to no effect.
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] StreamConverters.fromOutputStream(..) stops processing messages

2017-05-15 Thread Alexey Shuksto
Hello hAkkers,

Our app spawns multiple external processes (via java.util.Runtime.exec(..)) 
and then writes to STDIN of such processes via:
```
StreamConverters
  .fromOutputStream(() => process.getOutputStream, autoFlush = true)
  .runWith(MergeHub.source[])
```

MergeHub is attached to output stream to prevent closing of such stream 
after one of client done writing to it.

While there are up to, say, four external processes everything is working 
fine, but as soon as they number increases, OutputStreamSubscriber just 
stops to request next elements for all of them.
I've checked via adding `.confate` for incoming ByteStrings -- buffer just 
keeps growing.

It is possible to replace StreamConverters with simple
```
Sink
  .foreach[ByteString] { 
process.getOutputStream.write(bs.toArray)
process.getOutputStream.flush()
  }
```
and everything would work, but it's blocking and stuff.

What may be the cause of this? I've tried to change settings of 
"default-blocking-io-dispatcher", but to no effect.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] MergeHub slows consumption when Source with 'delay' is merged to it -- bug?

2017-03-27 Thread Alexey Shuksto
Hello hAkkers,

I've got so very weird situation when 'MergeHub.source' with merged in it 
'Source.repeat(..).delay(..)' slowly decreases consumption rate of 
materialized Sinks.

Some simple synthetic test follows:

import java.time.Instant


import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, MergeHub, Sink, Source}


import scala.concurrent.duration._


object DelayTest extends App {


  implicit val as = ActorSystem("delay")
  implicit val am = ActorMaterializer()


  val ticker = Source
.tick(0.second, 1.second, List('first, 'second, 'third, 'fourth, 'fifth, 
'sixth))
.mapConcat(identity)


  val hub = MergeHub
.source[Symbol]
.merge {
  Source.repeat('repeat).delay(15.seconds)
}
.toMat(Sink.foreach { s =>
  println(s"${Instant.now()} -- got [$s].")
})(Keep.left)
.run()


ticker.runWith(hub)


}

When run, messages from 'ticker' will appear through constantly increasing 
intervals.

As soon as we change 'Source.repeat(..).delay(..)' to 'Source.tick(..)' - 
problem disappears.

What could be a possible cause for this weird mechanics?

P.S.: Scala 2.12.1, Akka 2.4.17, Java 1.8.0_112.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Strange test failure in `Flow.filter(..)` with test-kit TestSink

2016-10-21 Thread Alexey Shuksto
Hello hAkkers,

Consider this simple test (scalatest 3.0.0, scalacheck 1.13.2 and 
akka-streams-testkit 2.4.11 is used):

  describe("Either") {
implicit def noShrink[T]: Shrink[List[T]] = Shrink.shrinkAny

it("should filter 'Either.Left(..)' objects") {

  forAll("left", "right.head", "right.tail") { (ls: List[String], rh: 
Int, rt: List[Int]) ⇒
val ri = rh :: rt

// Will fail if last element is not Right(...)
val source: List[Either[String, Int]] = ls.map(Left[String, Int]) 
::: ri.map(Right[String, Int])

val (pub, sub) = TestSource.probe[Either[String, Int]]
  .via(Flow[Either[String, Int]].filter(_.isRight).map { case Right(
i) ⇒ i })
  .toMat(TestSink.probe[Int])(Keep.both)
  .run()

sub.request(ri.size.toLong)

source.foreach(pub.sendNext)
sub.expectNextUnorderedN(ri)

pub.sendComplete()
sub.expectComplete()
  }
}
  }


This test will fail with `Message: assertion failed: timeout (3 seconds) 
during expectMsg while waiting for OnComplete` as soon as we change the 
order of `ls` and `ri` lists (or add some shuffle to it):

val source: List[Either[String, Int]] = ri.map(Right[String, Int]) ::: ls.
map(Left[String, Int])


Actually, it will fail as soon as latest element in Source is not 
`Right(..)`. Seems to me that leftover `Left` elements in Source do not 
allow TestSink.probe to receive onComplete message.

Is there any way to prevent that fail or is it normal and expected behavior?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Alexey Shuksto
среда, 19 октября 2016 г., 15:47:31 UTC+3 пользователь √ написал:
>
> Ok, so a sort of "correlator"-stage?
>

Exactly.
 

> Yes, so the BidiFlow you create has shared state tied to the intance, not 
> the mateiralization.
> I think you'll need to create a custom GraphStage with a BidiShape.
>

Yep, thanks for your help -- I've implement such correlator as GraphStage, 
looks better now.

One minor question (docs aren't clear enough there): is it safe to drop 
`pull(in)` in `in` `onPush()` handler if such `pull(in)` happen in `out` 
`onPull()` handler?

In documentation for custom stages `in` pulled twice (both in `onPush()` 
and `onPull()`) and in code it is only pulled once.
 

>  
>
>>
>> среда, 19 октября 2016 г., 15:23:04 UTC+3 пользователь √ написал:
>>
>>>
>>>
>>> On Wed, Oct 19, 2016 at 2:18 PM, Alexey Shuksto <sei...@gmail.com> 
>>> wrote:
>>>
>>>> 2 Konrad: Yep, in original question I meant not 'DSL construction time' 
>>>> but 'execution time' thread-safety. Thanks for clarification.
>>>>
>>>> 2 Victor: Use case is simple: outgoing flow need to store `Promise` of 
>>>> future remote response in some shared state which then would be completed 
>>>> by incoming flow. 
>>>>
>>>
>>> So it's a bidirectional buffer of Promises and Futures?
>>>
>>>
>>>  
>>>
>>>> There could be as many promises as there were outgoing messages, but 
>>>> the order of responses are not guaranteed and there could be additional 
>>>> messages in incoming flow.
>>>>
>>>> What do you meant by 'actively prevents multiple materializations'?
>>>>
>>>
>>> What happens when you materialize that bidiflow N times?
>>>  
>>>
>>>>
>>>> среда, 19 октября 2016 г., 14:29:14 UTC+3 пользователь √ написал:
>>>>>
>>>>> Hi Alexey,
>>>>>
>>>>> Not only is it not thread-safe, but it also actively prevents multiple 
>>>>> materializations.
>>>>>
>>>>> Perhaps if you state your use-case we can suggest an alternative?
>>>>>
>>>>> On Wed, Oct 19, 2016 at 1:24 PM, Alexey Shuksto <sei...@gmail.com> 
>>>>> wrote:
>>>>>
>>>>>> Hello hAkkers,
>>>>>>
>>>>>> Simple example:
>>>>>> val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
>>>>>>   var counter = 0
>>>>>>   
>>>>>>   val outbound = b.add(Flow[String].map { str =>
>>>>>> counter += 1
>>>>>> str -> counter
>>>>>>   })
>>>>>>   val inbound = b.add(Flow[(String, Int)].map { pair =>
>>>>>> counter -= 1
>>>>>> pair._1
>>>>>>   })
>>>>>>   
>>>>>>   BidiShape.fromFlows(outbound, inbound)
>>>>>> })
>>>>>>
>>>>>> Can I presume that contents of 'build block' is thread-safe or I need 
>>>>>> to guard `counter` somehow (use `AtomicInt` and such)?
>>>>>>
>>>>>> Also, do BidiFlow support 'duplex' mode or they process 
>>>>>> incoming/outgoing messages one at time?
>>>>>>
>>>>>> -- 
>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>> >>>>>>>>>> Check the FAQ: 
>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>> >>>>>>>>>> Search the archives: 
>>>>>> https://groups.google.com/group/akka-user
>>>>>> --- 
>>>>>> You received this message because you are subscribed to the Google 
>>>>>> Groups "Akka User List" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it, 
>>>>>> send an email to akka-user+...@googlegroups.com.
>>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> -- 
>>>>> Cheers,
>>>>> √
>>>>>
>>>&

Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Alexey Shuksto
1. Flow itself is a bidi-codec from ByteString to our own Request/Response 
entities. Each Request has Promise[Response] attribute. Shared state is 
more like Map[Request.Id, Promise[Response]] -- because order of Responses 
are not guarantied.

2. It should have state shared only "inside" this one materialized flow.

среда, 19 октября 2016 г., 15:23:04 UTC+3 пользователь √ написал:
>
>
>
> On Wed, Oct 19, 2016 at 2:18 PM, Alexey Shuksto <sei...@gmail.com 
> > wrote:
>
>> 2 Konrad: Yep, in original question I meant not 'DSL construction time' 
>> but 'execution time' thread-safety. Thanks for clarification.
>>
>> 2 Victor: Use case is simple: outgoing flow need to store `Promise` of 
>> future remote response in some shared state which then would be completed 
>> by incoming flow. 
>>
>
> So it's a bidirectional buffer of Promises and Futures?
>
>
>  
>
>> There could be as many promises as there were outgoing messages, but the 
>> order of responses are not guaranteed and there could be additional 
>> messages in incoming flow.
>>
>> What do you meant by 'actively prevents multiple materializations'?
>>
>
> What happens when you materialize that bidiflow N times?
>  
>
>>
>> среда, 19 октября 2016 г., 14:29:14 UTC+3 пользователь √ написал:
>>>
>>> Hi Alexey,
>>>
>>> Not only is it not thread-safe, but it also actively prevents multiple 
>>> materializations.
>>>
>>> Perhaps if you state your use-case we can suggest an alternative?
>>>
>>> On Wed, Oct 19, 2016 at 1:24 PM, Alexey Shuksto <sei...@gmail.com> 
>>> wrote:
>>>
>>>> Hello hAkkers,
>>>>
>>>> Simple example:
>>>> val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
>>>>   var counter = 0
>>>>   
>>>>   val outbound = b.add(Flow[String].map { str =>
>>>> counter += 1
>>>> str -> counter
>>>>   })
>>>>   val inbound = b.add(Flow[(String, Int)].map { pair =>
>>>> counter -= 1
>>>> pair._1
>>>>   })
>>>>   
>>>>   BidiShape.fromFlows(outbound, inbound)
>>>> })
>>>>
>>>> Can I presume that contents of 'build block' is thread-safe or I need 
>>>> to guard `counter` somehow (use `AtomicInt` and such)?
>>>>
>>>> Also, do BidiFlow support 'duplex' mode or they process 
>>>> incoming/outgoing messages one at time?
>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> √
>>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Alexey Shuksto
2 Konrad: Yep, in original question I meant not 'DSL construction time' but 
'execution time' thread-safety. Thanks for clarification.

2 Victor: Use case is simple: outgoing flow need to store `Promise` of 
future remote response in some shared state which then would be completed 
by incoming flow. There could be as many promises as there were outgoing 
messages, but the order of responses are not guaranteed and there could be 
additional messages in incoming flow.

What do you meant by 'actively prevents multiple materializations'?

среда, 19 октября 2016 г., 14:29:14 UTC+3 пользователь √ написал:
>
> Hi Alexey,
>
> Not only is it not thread-safe, but it also actively prevents multiple 
> materializations.
>
> Perhaps if you state your use-case we can suggest an alternative?
>
> On Wed, Oct 19, 2016 at 1:24 PM, Alexey Shuksto <sei...@gmail.com 
> > wrote:
>
>> Hello hAkkers,
>>
>> Simple example:
>> val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
>>   var counter = 0
>>   
>>   val outbound = b.add(Flow[String].map { str =>
>> counter += 1
>> str -> counter
>>   })
>>   val inbound = b.add(Flow[(String, Int)].map { pair =>
>> counter -= 1
>> pair._1
>>   })
>>   
>>   BidiShape.fromFlows(outbound, inbound)
>> })
>>
>> Can I presume that contents of 'build block' is thread-safe or I need to 
>> guard `counter` somehow (use `AtomicInt` and such)?
>>
>> Also, do BidiFlow support 'duplex' mode or they process incoming/outgoing 
>> messages one at time?
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Alexey Shuksto
Hello hAkkers,

Simple example:
val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
  var counter = 0
  
  val outbound = b.add(Flow[String].map { str =>
counter += 1
str -> counter
  })
  val inbound = b.add(Flow[(String, Int)].map { pair =>
counter -= 1
pair._1
  })
  
  BidiShape.fromFlows(outbound, inbound)
})

Can I presume that contents of 'build block' is thread-safe or I need to 
guard `counter` somehow (use `AtomicInt` and such)?

Also, do BidiFlow support 'duplex' mode or they process incoming/outgoing 
messages one at time?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Suddenly increasing inter-actor message delivery time

2016-09-08 Thread Alexey Shuksto
Hello hAkkers!

We've got some very strange message delivery time pattern between actors:

We have system with ~2000 type "A" working actors, each of whom have 1 to 
50 type "B" sub-workers (who do actual work, but do it very fast -- >1ms 
between request and response).

Every type A actor every second receives 1 to 50 (equal to number of 
sub-workers) payloads of 1 to 10 messages (1 to 500 messages total), 
chooses one type B actor per payload (1 to 10 messages), forwards them and 
interpreting B-actor result.

Number of "in-payload" messages is dependent of 'second-per-minute' -- most 
messages are received in 29th and 59th seconds.

Usually a total time of message processing is around 1 to 5ms for a full 
circle: in -> A -> B -> A -> out.
But in the "high load" times processing time quickly escalates up to 2.5 
_seconds_.

After some investigation and providing of separate dispatchers for type A 
(FJE, parallelism 8 min, 64 max, 3.0 factor) and type B (same 
configuration) actors, we were able to determine that type A actor still 
receive messages at very fast rate, but type B actors...
At the start of processing they receive payload almost momentarily (0 to 
1ms latency), but as processing continues, time to deliver message from A 
to B starts increasing up to 2.5 seconds mentioned above.

We tried to tweak type B dispatcher and set SingleConsumerOnlyUnboundedMailbox 
for them to no effect at all.

>From hardware side we have dual 6 core Intel server class processors (24 
cores total), JVM has 32GB of ram (no swapping), G1GC, gc pauses do not 
exceed 100ms and happens usually every 10-15 seconds.

Is there anything else that we can tweak or use to pinpoint the problem? 
May be some metric for average actor queue size and per-actor dispatcher 
time?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is there PromisedActorRef for everyone?

2016-06-28 Thread Alexey Shuksto
Yep but I will need to give some names to this proxy actors. Also, they 
would be supervised by '/user' and not by extension actor manager and won't 
form a nice and neat ActorPath tree like: '/user/extension/actor1', 
'/user/extension/actor2/' etc.

Basically, "proxy" actors is what I use right now, but looking for 
alternatives. :(

вторник, 28 июня 2016 г., 17:26:47 UTC+3 пользователь Patrik Nordwall 
написал:
>
>
> ActorRef subclasses are not intended to be implemented outside of Akka.
>
> Perhaps you can create a proxy actor for each destination. A real, 
> ordinary actor. It can handle resolve of selection, buffering, reconnects, 
> and all kind of fancy things you need.
>
> /Patrik
>   
>
-- 
>
> Patrik Nordwall
> Akka Tech Lead
> Lightbend  -  Reactive apps on the JVM
> Twitter: @patriknw
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is there PromisedActorRef for everyone?

2016-06-28 Thread Alexey Shuksto
Hi √,

I would very much like to use ActorSelection, except there is old post by 
Dr. Kuhn -- https://groups.google.com/forum/#!topic/akka-user/9e7LS72u9PA 
-- which urges to use ActorRefs where available.
And in current situation, ActorRef's are, indeed, available, but "later".

Mostly I was waiting for some of Akka developers "blessing" to use 
ActorSelection instead of ActorPath for 1-to-1 tells.

вторник, 28 июня 2016 г., 12:59:19 UTC+3 пользователь √ написал:
>
> Hi Alexey,
>
> How about using ActorSelection until you can't? :)
>
> On Tue, Jun 28, 2016 at 11:31 AM, Alexey Shuksto <sei...@gmail.com 
> > wrote:
>
>> Hello hAkkers,
>>
>> I have a component, Akka Extension, to be exact, which has one managing 
>> actor supervising several "worker" actors ()with different logic and roles).
>>
>> I want to be able to provide users of extension with stable ActorRef's of 
>> worker actors:
>>
>> class MyExtension(system: ExtendedActorSystem) extends Extension {
>>   val manager = system actorOf (ManagerActor.props(), "manager")
>>
>>   val actor1: ActorRef = ...
>>   val actor2: ActorRef = ...
>> }
>>
>>
>> I see two ways of doing so:
>>
>>1. Use ActorSelection instead of ActorRef -- I have stable names and 
>>all selections will be 1-to-1, but still there will be some overhead;
>>2. Provide users with Future[ActorRef] and let ManagerActor complete 
>>Promise[ActorRef], but than users will always have to deal with Future's 
>>.foreach, .map etc.
>>
>> Surely I don't want to Await till all ActorRef futures is resolved.
>>
>> I think that a good solution would be to wrap Future[ActorRef] as 
>> ActorRef instance, that delays all communication till the future is 
>> resolved and forwards after that.
>>
>> The question is -- maybe I'm overlooking some much more simpler and 
>> already existent solution?
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Is there PromisedActorRef for everyone?

2016-06-28 Thread Alexey Shuksto
Hello hAkkers,

I have a component, Akka Extension, to be exact, which has one managing 
actor supervising several "worker" actors ()with different logic and roles).

I want to be able to provide users of extension with stable ActorRef's of 
worker actors:

class MyExtension(system: ExtendedActorSystem) extends Extension {
  val manager = system actorOf (ManagerActor.props(), "manager")

  val actor1: ActorRef = ...
  val actor2: ActorRef = ...
}


I see two ways of doing so:

   1. Use ActorSelection instead of ActorRef -- I have stable names and all 
   selections will be 1-to-1, but still there will be some overhead;
   2. Provide users with Future[ActorRef] and let ManagerActor complete 
   Promise[ActorRef], but than users will always have to deal with Future's 
   .foreach, .map etc.

Surely I don't want to Await till all ActorRef futures is resolved.

I think that a good solution would be to wrap Future[ActorRef] as ActorRef 
instance, that delays all communication till the future is resolved and 
forwards after that.

The question is -- maybe I'm overlooking some much more simpler and already 
existent solution?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] Is there a way to access Configuration from Flow?

2016-02-17 Thread Alexey Shuksto
Hello there,

Is there a way to access run-time system configuration in 
GraphDSL.create(...) similar to as any Actor could access one via 
`context.system.settings.config`?

I can, possibly do something like `flow.zip(Source.repeat(config))`, but 
maybe there is some approach that will not require to provide ActorSystem 
of Config to graph creation procedure?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] ActorSubscriber to ActorPublisher flow -- how to?

2015-08-21 Thread Alexey Shuksto
Well, I was able to achieve needed behavior with following flow scheme:

Flow() { implicit builder ⇒
  val promises = builder add Source(() ⇒ Iterator continually Promise[O]())

  val fan = builder add Broadcast[Promise[O]](2)
  val zip = builder add Zip[I, Promise[O]]()

  val flow = builder add Flow[Promise[O]].mapAsync(1)(_.future)
  val sink = builder add Sink.actorSubscriber[I, Promise[O]](props)

  //+Flow: @formatter:off
  promises ~ fan ~ zip.in1
  flow ~ fan
  sink~ zip.out
  //-Flow: @formatter:on

  zip.in0 → flow.outlet
}


Where `I` and `O` are input and output types, respectfully, and `props` is 
`Props(..)` of enrichment Actor, who receives Tuple(i: I, p: Promise[O]), 
enriches i to instance of `O` and fulfills promise.

But there is still some questions I would very much like to know answers to:

1. Is there a way to create lazy repeated Source of (: = Promise[O])?
Logs show that Source(() = Iterator continually Promise[O]()) eagerly 
produces several promised values, and if actual inlet completes after very 
first instance of `I` they are discarded.

2. Is there a simpler way to produce Flow[O] from Flow[Promise[O]] than via 
`.mapAsync(1)(_.future)`?

3. Will Sink.actorSubscriber(..) shutdown underlying actor after stream is 
completed or it needs to be done from actor itself after OnComplete event? 
And what about errors?

четверг, 20 августа 2015 г., 18:33:43 UTC+3 пользователь Alexey Shuksto 
написал:

 Well, I want to simply connect them, but I want to connect Sink
 .actorSubscriber(...) ~ Source.actorPublisher(...), not other way 
 around, like you said.

 I've got Actor, that enriches HTTP Requests that our server receives. 
 After some enrichment, I need to be able to send request further down for 
 processing:

 ```
 val source = Source.single(/*HttpRequest*/)
 val sink = Sink.head[ProcessedRequest]

 val enrichment: Flow[HttpRequest, EnrichedRequest] = ???
 val processing: Flow[EnrichedRequest, ProcessedRequest] = ???

 source.via(enrichment).via(processing).to(sink)
 ```

 I want to be able to use our enrichment Actor in Flow[HttpRequest, 
 EnrichedRequest] stage. Actor is able to receive HttpRequest and reply (or 
 forward) ProcessedRequest, but I don't know how to create Flow from it.

  



 чт, 20 авг. 2015 г. в 18:20, Konrad Malawski konrad.malaw...@typesafe.com
 :

 Hello there Alexey,
 I'm not sure I understand what you want to achieve, is it simply to 
 connect such Sink to the Source?
 That's simply: 
 Source.actorPublisher(...).to(Sink.actorSubscriber(...)).run()

 Or do you mean to put another processing stage between them?
 If so then read about custom processing stages: 
 doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.html


 -- 
 Cheers,
 Konrad Malawski
 Akka @ Typesafe

 On 20 August 2015 at 17:12:55, Alexey Shuksto (seig...@gmail.com) wrote:

 Hello there, 

 I could not find answer to this particular question neither in docs nor 
 in this group:

 Is it possible to wire Sink.actorSubscriber(...) into 
 Source.actorPublisher(...) in one (Partial-) Flow? And by 'wire' I mean to 
 somehow send message from ActorSubscriber, instantiated from given props, 
 to the 'same flow' instance of ActorPublisher?

 I was thinking about giving some unique Id to both of them and then 
 publish/subscribe events through `context.system.eventStream`, but then I 
 realized that all materialized flows would have same Id and messages would 
 mess up... :(

 In fact, I'm interested in even more simple task -- create an Actor, that 
 could act as PartialFlow[In, Out]: when you push a value into flow, actor 
 receives it, processes and sends further down the line.

 Of course, I always could do something like Flow[In] mapAsync (data = 
 (actor ? data).mapTo[classOf[Out]]) -- but I would really, really prefer to 
 do this message way.

 --
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+unsubscr...@googlegroups.com.
 To post to this group, send email to akka-user@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http

[akka-user] [akka-streams] ActorSubscriber to ActorPublisher flow -- how to?

2015-08-20 Thread Alexey Shuksto
Hello there,

I could not find answer to this particular question neither in docs nor in 
this group:

Is it possible to wire Sink.actorSubscriber(...) into 
Source.actorPublisher(...) in one (Partial-) Flow? And by 'wire' I mean to 
somehow send message from ActorSubscriber, instantiated from given props, 
to the 'same flow' instance of ActorPublisher?

I was thinking about giving some unique Id to both of them and then 
publish/subscribe events through `context.system.eventStream`, but then I 
realized that all materialized flows would have same Id and messages would 
mess up... :(

In fact, I'm interested in even more simple task -- create an Actor, that 
could act as PartialFlow[In, Out]: when you push a value into flow, actor 
receives it, processes and sends further down the line.

Of course, I always could do something like Flow[In] mapAsync (data = 
(actor ? data).mapTo[classOf[Out]]) -- but I would really, really prefer to 
do this message way.

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.