[akka-user] Re: Akka stream - Flow in Pause

2016-10-14 Thread Konrad 'ktoso' Malawski
Seems like double posted the same question?
Please don't double post the same question :-)

I answered in that 
thread: https://groups.google.com/forum/#!topic/akka-user/7VqlP1w_CN8

-- 
Konrad
Akka Team

W dniu piątek, 14 października 2016 09:02:59 UTC+2 użytkownik regis leray 
napisał:
>
> I'm trying to implement the pause/resume in akka stream.
> The current implementation, is buffering all elements into an List if the 
> valve is in mode "close", if not we are pushing elements like the default 
> behavior.
>
> When we are resuming the flow by calling the materialize value 
> switch.open, we change the mode to open and pushing all buffered elements.
> Currently im suspecting this code to be the problem
>
> val switch = new ValveSwitch {
>   override def open: Unit = {
> mode = ValveMode.Open
> println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
> bufferedElement.foreach(push(out, _))
> bufferedElement = Option.empty
>   }
>
> ...
>
> }
>
>
> In my unit test when the switch is changing close to open, the Sink never 
> receive the elements
>
> "A closed valve" should "emit only 3 elements after it has been open" in {
>   val (valve, probe) = Source(1 to 5)
> .viaMat(new Valve(ValveMode.Closed))(Keep.right)
> .toMat(TestSink.probe[Int])(Keep.both)
> .run()
>
>   probe.request(2)
>   probe.expectNoMsg()
>
>   valve.open // we are pushing the buffered elements
>   probe.expectNext shouldEqual 1 // this assert is failing !
>   probe.expectNext shouldEqual 2
>
> }
>
>
> Any help would be really appreciated
>
> Here
> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>
>
> Le jeudi 13 octobre 2016 12:52:58 UTC-4, regis leray a écrit :
>>
>> Hi, 
>>
>> I'm trying to implements a way to control a flow (start/stop), nothing 
>> was implemented yet in the core of akka-stream
>>
>> My current implementation looks like this.
>>
>> trait ValveSwitch {
>>   def open: Unit
>>   def close: Unit
>> }
>>
>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>
>>   override val shape = FlowShape(Inlet[A]("valve.in"), 
>> Outlet[A]("valve.out"))
>>
>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>> Attributes): (GraphStageLogic, ValveSwitch) = {
>> val logic = new ValveGraphStageLogic(shape, mode)
>> (logic, logic.switch)
>>   }
>>
>>   private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>> extends GraphStageLogic(shape){
>> import shape._
>>
>> var bufferedElement = List.empty[A]
>>
>> val switch = new ValveSwitch {
>>   override def open: Unit = {
>> mode = ValveMode.Open
>> println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>> bufferedElement.foreach(push(out, _))
>> bufferedElement = List.empty
>>   }
>>
>>   override def close: Unit = {
>> mode = ValveMode.Closed
>>   }
>> }
>>
>> setHandler(in, new InHandler {
>>   override def onPush(): Unit = {
>> val element = grab(in) //acquires the element that has been 
>> received during an onPush
>> println(s"${mode} on push called with $element")
>> if (mode == ValveMode.Open) {
>>   push(out, element) //push directly the element on the out port
>> } else {
>>   bufferedElement = bufferedElement :+ element
>> }
>>   }
>> })
>>
>> setHandler(out, new OutHandler {
>>   override def onPull(): Unit = {
>> println("on pull called")
>> pull(in) //request the next element on in port
>>   }
>> })
>>   }
>> }
>>
>> trait ValveMode
>>
>> object ValveMode {
>>   case object Open extends ValveMode
>>   case object Closed extends ValveMode
>> }
>>
>> 
>>
>> My current unit test is failing. due to the fact when i open the valve, i 
>> never received the previous message.
>> It seems even if i push the element through ( valve.open ) the sink 
>> never receive the element
>>
>> class ValveSpec extends FlatSpec {
>>
>>   implicit val system = ActorSystem()
>>   implicit val materializer = ActorMaterializer()
>>   implicit val executionContext = materializer.executionContext
>>
>>
>>   "A closed valve" should "emit only 3 elements after it has been open" 
>> in {
>> val (valve, probe) = Source(1 to 3)
>>   .viaMat(new Valve(ValveMode.Closed))(Keep.right)
>>   .toMat(TestSink.probe[Int])(Keep.both)
>>   .run()
>>
>> probe.request(1)
>> probe.expectNoMsg()
>>
>> valve.open
>> probe.expectNext(1)
>>
>> probe.request(2)
>> probe.expectNext(2, 3)
>>
>> probe.expectComplete()
>>   }
>> }
>>
>>
>> Here the gist 
>> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> 

[akka-user] Re: Akka stream - Flow in Pause

2016-10-14 Thread regis leray
I'm trying to implement the pause/resume in akka stream.
The current implementation, is buffering all elements into an List if the 
valve is in mode "close", if not we are pushing elements like the default 
behavior.

When we are resuming the flow by calling the materialize value switch.open, 
we change the mode to open and pushing all buffered elements.
Currently im suspecting this code to be the problem

val switch = new ValveSwitch {
  override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")

bufferedElement.foreach(push(out, _))
bufferedElement = Option.empty
  }

...

}


In my unit test when the switch is changing close to open, the Sink never 
receive the elements

"A closed valve" should "emit only 3 elements after it has been open" in {
  val (valve, probe) = Source(1 to 5)
.viaMat(new Valve(ValveMode.Closed))(Keep.right)
.toMat(TestSink.probe[Int])(Keep.both)
.run()

  probe.request(2)
  probe.expectNoMsg()

  valve.open // we are pushing the buffered elements
  probe.expectNext shouldEqual 1 // this assert is failing !
  probe.expectNext shouldEqual 2

}


Any help would be really appreciated

Here
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938


Le jeudi 13 octobre 2016 12:52:58 UTC-4, regis leray a écrit :
>
> Hi, 
>
> I'm trying to implements a way to control a flow (start/stop), nothing was 
> implemented yet in the core of akka-stream
>
> My current implementation looks like this.
>
> trait ValveSwitch {
>   def open: Unit
>   def close: Unit
> }
>
> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>
>   override val shape = FlowShape(Inlet[A]("valve.in"), 
> Outlet[A]("valve.out"))
>
>   override def createLogicAndMaterializedValue(inheritedAttributes: 
> Attributes): (GraphStageLogic, ValveSwitch) = {
> val logic = new ValveGraphStageLogic(shape, mode)
> (logic, logic.switch)
>   }
>
>   private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
> extends GraphStageLogic(shape){
> import shape._
>
> var bufferedElement = List.empty[A]
>
> val switch = new ValveSwitch {
>   override def open: Unit = {
> mode = ValveMode.Open
> println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
> bufferedElement.foreach(push(out, _))
> bufferedElement = List.empty
>   }
>
>   override def close: Unit = {
> mode = ValveMode.Closed
>   }
> }
>
> setHandler(in, new InHandler {
>   override def onPush(): Unit = {
> val element = grab(in) //acquires the element that has been 
> received during an onPush
> println(s"${mode} on push called with $element")
> if (mode == ValveMode.Open) {
>   push(out, element) //push directly the element on the out port
> } else {
>   bufferedElement = bufferedElement :+ element
> }
>   }
> })
>
> setHandler(out, new OutHandler {
>   override def onPull(): Unit = {
> println("on pull called")
> pull(in) //request the next element on in port
>   }
> })
>   }
> }
>
> trait ValveMode
>
> object ValveMode {
>   case object Open extends ValveMode
>   case object Closed extends ValveMode
> }
>
> 
>
> My current unit test is failing. due to the fact when i open the valve, i 
> never received the previous message.
> It seems even if i push the element through ( valve.open ) the sink never 
> receive the element
>
> class ValveSpec extends FlatSpec {
>
>   implicit val system = ActorSystem()
>   implicit val materializer = ActorMaterializer()
>   implicit val executionContext = materializer.executionContext
>
>
>   "A closed valve" should "emit only 3 elements after it has been open" in 
> {
> val (valve, probe) = Source(1 to 3)
>   .viaMat(new Valve(ValveMode.Closed))(Keep.right)
>   .toMat(TestSink.probe[Int])(Keep.both)
>   .run()
>
> probe.request(1)
> probe.expectNoMsg()
>
> valve.open
> probe.expectNext(1)
>
> probe.request(2)
> probe.expectNext(2, 3)
>
> probe.expectComplete()
>   }
> }
>
>
> Here the gist 
> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.