Re: [akka-user] Akka stream - Framing with multiple delimiters

2017-11-11 Thread regis leray
Oki, i will create a pull request to provide a function, good idea !

Le samedi 11 novembre 2017 03:35:28 UTC-5, Patrik Nordwall a écrit :
>
> You could do that by first replacing the various delimiters with one 
> common, in a map stage. That said, if this is typical framing requirement 
> we could perhaps add more power to the framing stage, e.g. a funtion 
> ByteString => Boolean. That would be a good community contribution.
>
> /Patrik
>
> lör 11 nov. 2017 kl. 04:31 skrev regis leray <regis...@gmail.com 
> >:
>
>> Hi,
>>
>> Currently the implementation 
>>
>> Framing.delimiter(ByteString("."), Int.MaxValue)
>>
>>
>> only accept one delimiter. I would like to be able to use many delimiters
>> . , ;
>>
>> to be able to parse such string
>> Lorem Ipsum is simply,Dummy text of the printing;And typesetting industry
>> .
>>
>> thanks
>>
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka stream - Framing with multiple delimiters

2017-11-10 Thread regis leray
Hi,

Currently the implementation 

Framing.delimiter(ByteString("."), Int.MaxValue)


only accept one delimiter. I would like to be able to use many delimiters
. , ;

to be able to parse such string
Lorem Ipsum is simply,Dummy text of the printing;And typesetting industry.

thanks


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka stream - implements Pause/Resume

2016-10-14 Thread regis leray
Thanks a lot i succeed, to make it work...

val switch = new ValveSwitch {
  val callback = getAsyncCallback[A](push(out, _))

  override def open: Unit = {
mode = ValveMode.Open
bufferedElement.foreach(callback.invoke)
bufferedElement = Option.empty
  }

  override def close: Unit = {
mode = ValveMode.Closed
  }
}



The whole solution
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938


Le vendredi 14 octobre 2016 08:13:21 UTC-4, Konrad Malawski a écrit :
>
> Have you read the blog post?
> In the async callback you can push(), that's what I meant.
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 14 October 2016 at 10:11:09, Konrad 'ktoso' Malawski (kto...@gmail.com 
> ) wrote:
>
> Please read this: 
> - http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis 
> - and this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream
> -customize.html
>
> Specifically, your trigger should be implemented as async-callback, as it 
> comes from the outside but should "wake up" the stage to be able to push 
> data again.
> In your current setup it never "wakes up" since all pulls/pushes have been 
> processed - the stage has no idea it should do something once you called 
> open.
>
> -- Konrad
>
> W dniu piątek, 14 października 2016 09:03:54 UTC+2 użytkownik regis leray 
> napisał: 
>>
>> Hi, 
>>
>> Im currently trying to implement a valve Graph to manage pause/resume. We 
>> can control the behavior of the graph by using the MaterializeValue
>>
>> trait ValveSwitch {
>>   def open: Unit
>>   def close: Unit
>> }
>>
>>
>> Current implementation of the valve
>>
>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>
>>   override val shape = FlowShape(Inlet[A]("valve.in"), 
>> Outlet[A]("valve.out"))
>>
>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>> Attributes): (GraphStageLogic, ValveSwitch) = {
>> val logic = new ValveGraphStageLogic(shape, mode)
>> (logic, logic.switch)
>>   }
>>
>> }
>>
>>
>> The current implementation is pretty simple, each time we are receiving a 
>> onPull demand we are requesting by doing pull(in).
>> When a onPush demand is received we are checking the current state 
>> - if Open we are doing the default behavior by doing push(out,element)
>> - if Close we are putting the element into a queue
>>
>> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>> extends GraphStageLogic(shape){
>>   import shape._
>>
>>   var bufferedElement = List.empty[A]
>>
>>   val switch = new ValveSwitch {
>> override def open: Unit = {
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>> override def close: Unit = {
>>   mode = ValveMode.Closed
>> }
>>   }
>>
>>   setHandler(in, new InHandler {
>> override def onPush(): Unit = {
>>   val element = grab(in) //acquires the element that has been received 
>> during an onPush
>>   println(s"${mode} on push called with $element")
>>   if (mode == ValveMode.Open) {
>> push(out, element) //push directly the element on the out port
>>   } else {
>> bufferedElement = bufferedElement :+ element
>>   }
>> }
>>   })
>>
>>   setHandler(out, new OutHandler {
>> override def onPull(): Unit = {
>>   println("on pull called")
>>   pull(in) //request the next element on in port
>> }
>>   })
>> }
>>
>>
>> When we are resuming the valve my using the switch.open, we are pushing 
>> the element
>>
>> override def open: Unit = {
>>
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>>
>> The Current test is failing
>>
>> "A closed valve" should "emit only 3 elements after it has been open" in {
>>   
>> val (valve, probe) = Source(1 to 5)
>> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //

Re: [akka-user] Akka stream - implements Pause/Resume

2016-10-14 Thread regis leray
Thanks a lot for your informations (and sorry for the double post).
Can you please tell me what is the way to "wake up the stage", it is not 
really obvious how to make that happen.

Thanks

Le vendredi 14 octobre 2016 04:09:12 UTC-4, Konrad Malawski a écrit :
>
> Please read this:
> - http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis 
> - and this: 
> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html
>
> Specifically, your trigger should be implemented as async-callback, as it 
> comes from the outside but should "wake up" the stage to be able to push 
> data again.
> In your current setup it never "wakes up" since all pulls/pushes have been 
> processed - the stage has no idea it should do something once you called 
> open.
>
> On Fri, Oct 14, 2016 at 12:13 AM, regis leray <regis...@gmail.com 
> > wrote:
>
>> Hi,
>>
>> Im currently trying to implement a valve Graph to manage pause/resume. We 
>> can control the behavior of the graph by using the MaterializeValue
>>
>> trait ValveSwitch {
>>   def open: Unit
>>   def close: Unit
>> }
>>
>>
>> Current implementation of the valve
>>
>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>
>>   override val shape = FlowShape(Inlet[A]("valve.in"), 
>> Outlet[A]("valve.out"))
>>
>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>> Attributes): (GraphStageLogic, ValveSwitch) = {
>> val logic = new ValveGraphStageLogic(shape, mode)
>> (logic, logic.switch)
>>   }
>>
>> }
>>
>>
>> The current implementation is pretty simple, each time we are receiving a 
>> onPull demand we are requesting by doing pull(in).
>> When a onPush demand is received we are checking the current state 
>> - if Open we are doing the default behavior by doing push(out,element)
>> - if Close we are putting the element into a queue
>>
>> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>> extends GraphStageLogic(shape){
>>   import shape._
>>
>>   var bufferedElement = List.empty[A]
>>
>>   val switch = new ValveSwitch {
>> override def open: Unit = {
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>> override def close: Unit = {
>>   mode = ValveMode.Closed
>> }
>>   }
>>
>>   setHandler(in, new InHandler {
>> override def onPush(): Unit = {
>>   val element = grab(in) //acquires the element that has been received 
>> during an onPush
>>   println(s"${mode} on push called with $element")
>>   if (mode == ValveMode.Open) {
>> push(out, element) //push directly the element on the out port
>>   } else {
>> bufferedElement = bufferedElement :+ element
>>   }
>> }
>>   })
>>
>>   setHandler(out, new OutHandler {
>> override def onPull(): Unit = {
>>   println("on pull called")
>>   pull(in) //request the next element on in port
>> }
>>   })
>> }
>>
>>
>> When we are resuming the valve my using the switch.open, we are pushing 
>> the element
>>
>> override def open: Unit = {
>>
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>>
>> The Current test is failing
>>
>> "A closed valve" should "emit only 3 elements after it has been open" in {
>>   
>> val (valve, probe) = Source(1 to 5)
>> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
>> default is closed, dont push any message
>> .toMat(TestSink.probe[Int])(Keep.both)
>> .run()
>>
>>   probe.request(2)
>>   probe.expectNoMsg()
>>
>>   valve.open //open the valve should emit the previous
>>
>>
>>   probe.expectNext shouldEqual 1 //we never receive the element
>>   probe.expectNext shouldEqual 2
>>
>>   probe.request(3)
>>   probe.expectNext shouldEqual 3
>>   probe.expectNext shouldEqual 4
>>   probe.expectNext shouldE

[akka-user] Akka stream - implements Pause/Resume

2016-10-14 Thread regis leray
Hi,

Im currently trying to implement a valve Graph to manage pause/resume. We 
can control the behavior of the graph by using the MaterializeValue

trait ValveSwitch {
  def open: Unit
  def close: Unit
}


Current implementation of the valve

class Valve[A](mode: ValveMode = ValveMode.Open) extends 
GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {

  override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))

  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, ValveSwitch) = {
val logic = new ValveGraphStageLogic(shape, mode)
(logic, logic.switch)
  }

}


The current implementation is pretty simple, each time we are receiving a 
onPull demand we are requesting by doing pull(in).
When a onPush demand is received we are checking the current state 
- if Open we are doing the default behavior by doing push(out,element)
- if Close we are putting the element into a queue

private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends 
GraphStageLogic(shape){
  import shape._

  var bufferedElement = List.empty[A]

  val switch = new ValveSwitch {
override def open: Unit = {
  mode = ValveMode.Open
  println(s"pushing $bufferedElement, out is available ? 
${isAvailable(out)}")

  bufferedElement.foreach(push(out, _))
  bufferedElement = List.empty
}

override def close: Unit = {
  mode = ValveMode.Closed
}
  }

  setHandler(in, new InHandler {
override def onPush(): Unit = {
  val element = grab(in) //acquires the element that has been received 
during an onPush
  println(s"${mode} on push called with $element")
  if (mode == ValveMode.Open) {
push(out, element) //push directly the element on the out port
  } else {
bufferedElement = bufferedElement :+ element
  }
}
  })

  setHandler(out, new OutHandler {
override def onPull(): Unit = {
  println("on pull called")
  pull(in) //request the next element on in port
}
  })
}


When we are resuming the valve my using the switch.open, we are pushing the 
element

override def open: Unit = {

  mode = ValveMode.Open
  println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")

  bufferedElement.foreach(push(out, _))
  bufferedElement = List.empty
}


The Current test is failing

"A closed valve" should "emit only 3 elements after it has been open" in {
  
val (valve, probe) = Source(1 to 5)
.viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
default is closed, dont push any message
.toMat(TestSink.probe[Int])(Keep.both)
.run()

  probe.request(2)
  probe.expectNoMsg()

  valve.open //open the valve should emit the previous


  probe.expectNext shouldEqual 1 //we never receive the element
  probe.expectNext shouldEqual 2

  probe.request(3)
  probe.expectNext shouldEqual 3
  probe.expectNext shouldEqual 4
  probe.expectNext shouldEqual 5

  probe.expectComplete()
}


Here the console log

on pull called
Closed on push called with 1
pushing Some(1), out is available ? true

Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 
seconds
at 
akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
at 
akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
at 
com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
at 
com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)


I'm suspecting the current code to have an issue when we are resuming the 
valve, it doesnt seems the push really works

val switch = new ValveSwitch {
override def open: Unit = {
  mode = ValveMode.Open
  println(s"pushing $bufferedElement, out is available ? 
${isAvailable(out)}")

  bufferedElement.foreach(push(out, _))
  bufferedElement = Option.empty
}

override def close: Unit = {
  mode = ValveMode.Closed
}
  }


There is definitively something i dont catch up, if anyone could help me to 
see some light

Here the gist 
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938

Any help would be appreciated

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka stream - Flow in Pause

2016-10-14 Thread regis leray
I'm trying to implement the pause/resume in akka stream.
The current implementation, is buffering all elements into an List if the 
valve is in mode "close", if not we are pushing elements like the default 
behavior.

When we are resuming the flow by calling the materialize value switch.open, 
we change the mode to open and pushing all buffered elements.
Currently im suspecting this code to be the problem

val switch = new ValveSwitch {
  override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")

bufferedElement.foreach(push(out, _))
bufferedElement = Option.empty
  }

...

}


In my unit test when the switch is changing close to open, the Sink never 
receive the elements

"A closed valve" should "emit only 3 elements after it has been open" in {
  val (valve, probe) = Source(1 to 5)
.viaMat(new Valve(ValveMode.Closed))(Keep.right)
.toMat(TestSink.probe[Int])(Keep.both)
.run()

  probe.request(2)
  probe.expectNoMsg()

  valve.open // we are pushing the buffered elements
  probe.expectNext shouldEqual 1 // this assert is failing !
  probe.expectNext shouldEqual 2

}


Any help would be really appreciated

Here
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938


Le jeudi 13 octobre 2016 12:52:58 UTC-4, regis leray a écrit :
>
> Hi, 
>
> I'm trying to implements a way to control a flow (start/stop), nothing was 
> implemented yet in the core of akka-stream
>
> My current implementation looks like this.
>
> trait ValveSwitch {
>   def open: Unit
>   def close: Unit
> }
>
> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>
>   override val shape = FlowShape(Inlet[A]("valve.in"), 
> Outlet[A]("valve.out"))
>
>   override def createLogicAndMaterializedValue(inheritedAttributes: 
> Attributes): (GraphStageLogic, ValveSwitch) = {
> val logic = new ValveGraphStageLogic(shape, mode)
> (logic, logic.switch)
>   }
>
>   private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
> extends GraphStageLogic(shape){
> import shape._
>
> var bufferedElement = List.empty[A]
>
> val switch = new ValveSwitch {
>   override def open: Unit = {
> mode = ValveMode.Open
> println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
> bufferedElement.foreach(push(out, _))
> bufferedElement = List.empty
>   }
>
>   override def close: Unit = {
> mode = ValveMode.Closed
>   }
> }
>
> setHandler(in, new InHandler {
>   override def onPush(): Unit = {
> val element = grab(in) //acquires the element that has been 
> received during an onPush
> println(s"${mode} on push called with $element")
> if (mode == ValveMode.Open) {
>   push(out, element) //push directly the element on the out port
> } else {
>   bufferedElement = bufferedElement :+ element
> }
>   }
> })
>
> setHandler(out, new OutHandler {
>   override def onPull(): Unit = {
> println("on pull called")
> pull(in) //request the next element on in port
>   }
> })
>   }
> }
>
> trait ValveMode
>
> object ValveMode {
>   case object Open extends ValveMode
>   case object Closed extends ValveMode
> }
>
> 
>
> My current unit test is failing. due to the fact when i open the valve, i 
> never received the previous message.
> It seems even if i push the element through ( valve.open ) the sink never 
> receive the element
>
> class ValveSpec extends FlatSpec {
>
>   implicit val system = ActorSystem()
>   implicit val materializer = ActorMaterializer()
>   implicit val executionContext = materializer.executionContext
>
>
>   "A closed valve" should "emit only 3 elements after it has been open" in 
> {
> val (valve, probe) = Source(1 to 3)
>   .viaMat(new Valve(ValveMode.Closed))(Keep.right)
>   .toMat(TestSink.probe[Int])(Keep.both)
>   .run()
>
> probe.request(1)
> probe.expectNoMsg()
>
> valve.open
> probe.expectNext(1)
>
> probe.request(2)
> probe.expectNext(2, 3)
>
> probe.expectComplete()
>   }
> }
>
>
> Here the gist 
> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka stream - Flow in Pause

2016-10-13 Thread regis leray
Hi, 

I'm trying to implements a way to control a flow (start/stop), nothing was 
implemented yet in the core of akka-stream

My current implementation looks like this.

trait ValveSwitch {
  def open: Unit
  def close: Unit
}

class Valve[A](mode: ValveMode = ValveMode.Open) extends 
GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {

  override val shape = FlowShape(Inlet[A]("valve.in"), 
Outlet[A]("valve.out"))

  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, ValveSwitch) = {
val logic = new ValveGraphStageLogic(shape, mode)
(logic, logic.switch)
  }

  private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
extends GraphStageLogic(shape){
import shape._

var bufferedElement = List.empty[A]

val switch = new ValveSwitch {
  override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ? 
${isAvailable(out)}")
bufferedElement.foreach(push(out, _))
bufferedElement = List.empty
  }

  override def close: Unit = {
mode = ValveMode.Closed
  }
}

setHandler(in, new InHandler {
  override def onPush(): Unit = {
val element = grab(in) //acquires the element that has been 
received during an onPush
println(s"${mode} on push called with $element")
if (mode == ValveMode.Open) {
  push(out, element) //push directly the element on the out port
} else {
  bufferedElement = bufferedElement :+ element
}
  }
})

setHandler(out, new OutHandler {
  override def onPull(): Unit = {
println("on pull called")
pull(in) //request the next element on in port
  }
})
  }
}

trait ValveMode

object ValveMode {
  case object Open extends ValveMode
  case object Closed extends ValveMode
}



My current unit test is failing. due to the fact when i open the valve, i 
never received the previous message.
It seems even if i push the element through ( valve.open ) the sink never 
receive the element

class ValveSpec extends FlatSpec {

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = materializer.executionContext


  "A closed valve" should "emit only 3 elements after it has been open" in {
val (valve, probe) = Source(1 to 3)
  .viaMat(new Valve(ValveMode.Closed))(Keep.right)
  .toMat(TestSink.probe[Int])(Keep.both)
  .run()

probe.request(1)
probe.expectNoMsg()

valve.open
probe.expectNext(1)

probe.request(2)
probe.expectNext(2, 3)

probe.expectComplete()
  }
}


Here the gist 
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.