Re: [akka-user] Re: Akka stream - implements Pause/Resume

2016-10-14 Thread Viktor Klang
Hi Regis,

unfortunately you're still doing unsafe things (you have a race-condition
in your accesses to bufferedElement)

On Fri, Oct 14, 2016 at 3:56 PM, regis leray  wrote:

> Thanks a lot i succeed, to make it work...
>
> val switch = new ValveSwitch {
>   val callback = getAsyncCallback[A](push(out, _))
>
>   override def open: Unit = {
> mode = ValveMode.Open
> bufferedElement.foreach(callback.invoke)
> bufferedElement = Option.empty
>   }
>
>   override def close: Unit = {
> mode = ValveMode.Closed
>   }
> }
>
>
>
> The whole solution
> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>
>
> Le vendredi 14 octobre 2016 08:13:21 UTC-4, Konrad Malawski a écrit :
>>
>> Have you read the blog post?
>> In the async callback you can push(), that's what I meant.
>>
>> --
>> Konrad `ktoso` Malawski
>> Akka  @ Lightbend 
>>
>> On 14 October 2016 at 10:11:09, Konrad 'ktoso' Malawski (kto...@gmail.com)
>> wrote:
>>
>> Please read this:
>> - http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis
>> - and this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-c
>> ustomize.html
>>
>> Specifically, your trigger should be implemented as async-callback, as it
>> comes from the outside but should "wake up" the stage to be able to push
>> data again.
>> In your current setup it never "wakes up" since all pulls/pushes have
>> been processed - the stage has no idea it should do something once you
>> called open.
>>
>> -- Konrad
>>
>> W dniu piątek, 14 października 2016 09:03:54 UTC+2 użytkownik regis leray
>> napisał:
>>>
>>> Hi,
>>>
>>> Im currently trying to implement a valve Graph to manage pause/resume.
>>> We can control the behavior of the graph by using the MaterializeValue
>>>
>>> trait ValveSwitch {
>>>   def open: Unit
>>>   def close: Unit
>>> }
>>>
>>>
>>> Current implementation of the valve
>>>
>>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>>
>>>   override val shape = FlowShape(Inlet[A]("valve.in"), 
>>> Outlet[A]("valve.out"))
>>>
>>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>>> Attributes): (GraphStageLogic, ValveSwitch) = {
>>> val logic = new ValveGraphStageLogic(shape, mode)
>>> (logic, logic.switch)
>>>   }
>>>
>>> }
>>>
>>>
>>> The current implementation is pretty simple, each time we are receiving
>>> a onPull demand we are requesting by doing pull(in).
>>> When a onPush demand is received we are checking the current state
>>> - if Open we are doing the default behavior by doing push(out,element)
>>> - if Close we are putting the element into a queue
>>>
>>> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>>> extends GraphStageLogic(shape){
>>>   import shape._
>>>
>>>   var bufferedElement = List.empty[A]
>>>
>>>   val switch = new ValveSwitch {
>>> override def open: Unit = {
>>>   mode = ValveMode.Open
>>>   println(s"pushing $bufferedElement, out is available ? 
>>> ${isAvailable(out)}")
>>>
>>>   bufferedElement.foreach(push(out, _))
>>>   bufferedElement = List.empty
>>> }
>>>
>>> override def close: Unit = {
>>>   mode = ValveMode.Closed
>>> }
>>>   }
>>>
>>>   setHandler(in, new InHandler {
>>> override def onPush(): Unit = {
>>>   val element = grab(in) //acquires the element that has been received 
>>> during an onPush
>>>   println(s"${mode} on push called with $element")
>>>   if (mode == ValveMode.Open) {
>>> push(out, element) //push directly the element on the out port
>>>   } else {
>>> bufferedElement = bufferedElement :+ element
>>>   }
>>> }
>>>   })
>>>
>>>   setHandler(out, new OutHandler {
>>> override def onPull(): Unit = {
>>>   println("on pull called")
>>>   pull(in) //request the next element on in port
>>> }
>>>   })
>>> }
>>>
>>>
>>> When we are resuming the valve my using the switch.open, we are pushing
>>> the element
>>>
>>> override def open: Unit = {
>>>
>>>   mode = ValveMode.Open
>>>   println(s"pushing $bufferedElement, out is available ? 
>>> ${isAvailable(out)}")
>>>
>>>   bufferedElement.foreach(push(out, _))
>>>   bufferedElement = List.empty
>>> }
>>>
>>>
>>> The Current test is failing
>>>
>>> "A closed valve" should "emit only 3 elements after it has been open" in {
>>>
>>> val (valve, probe) = Source(1 to 5)
>>> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
>>> default is closed, dont push any message
>>> .toMat(TestSink.probe[Int])(Keep.both)
>>> .run()
>>>
>>>   probe.request(2)
>>>   probe.expectNoMsg()
>>>
>>>   valve.open //open the valve should emit the previous
>>>
>>>
>>>   probe.expectNext shouldEqual 1 //we never receive the element
>>>   probe.expectNext shouldEqual 2
>>>
>>>   probe.request(3)
>>>   probe.expectNext shouldEqual 3
>>>   

Re: [akka-user] Re: Akka stream - implements Pause/Resume

2016-10-14 Thread regis leray
Thanks a lot i succeed, to make it work...

val switch = new ValveSwitch {
  val callback = getAsyncCallback[A](push(out, _))

  override def open: Unit = {
mode = ValveMode.Open
bufferedElement.foreach(callback.invoke)
bufferedElement = Option.empty
  }

  override def close: Unit = {
mode = ValveMode.Closed
  }
}



The whole solution
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938


Le vendredi 14 octobre 2016 08:13:21 UTC-4, Konrad Malawski a écrit :
>
> Have you read the blog post?
> In the async callback you can push(), that's what I meant.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 14 October 2016 at 10:11:09, Konrad 'ktoso' Malawski (kto...@gmail.com 
> ) wrote:
>
> Please read this: 
> - http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis 
> - and this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream
> -customize.html
>
> Specifically, your trigger should be implemented as async-callback, as it 
> comes from the outside but should "wake up" the stage to be able to push 
> data again.
> In your current setup it never "wakes up" since all pulls/pushes have been 
> processed - the stage has no idea it should do something once you called 
> open.
>
> -- Konrad
>
> W dniu piątek, 14 października 2016 09:03:54 UTC+2 użytkownik regis leray 
> napisał: 
>>
>> Hi, 
>>
>> Im currently trying to implement a valve Graph to manage pause/resume. We 
>> can control the behavior of the graph by using the MaterializeValue
>>
>> trait ValveSwitch {
>>   def open: Unit
>>   def close: Unit
>> }
>>
>>
>> Current implementation of the valve
>>
>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>
>>   override val shape = FlowShape(Inlet[A]("valve.in"), 
>> Outlet[A]("valve.out"))
>>
>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>> Attributes): (GraphStageLogic, ValveSwitch) = {
>> val logic = new ValveGraphStageLogic(shape, mode)
>> (logic, logic.switch)
>>   }
>>
>> }
>>
>>
>> The current implementation is pretty simple, each time we are receiving a 
>> onPull demand we are requesting by doing pull(in).
>> When a onPush demand is received we are checking the current state 
>> - if Open we are doing the default behavior by doing push(out,element)
>> - if Close we are putting the element into a queue
>>
>> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>> extends GraphStageLogic(shape){
>>   import shape._
>>
>>   var bufferedElement = List.empty[A]
>>
>>   val switch = new ValveSwitch {
>> override def open: Unit = {
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>> override def close: Unit = {
>>   mode = ValveMode.Closed
>> }
>>   }
>>
>>   setHandler(in, new InHandler {
>> override def onPush(): Unit = {
>>   val element = grab(in) //acquires the element that has been received 
>> during an onPush
>>   println(s"${mode} on push called with $element")
>>   if (mode == ValveMode.Open) {
>> push(out, element) //push directly the element on the out port
>>   } else {
>> bufferedElement = bufferedElement :+ element
>>   }
>> }
>>   })
>>
>>   setHandler(out, new OutHandler {
>> override def onPull(): Unit = {
>>   println("on pull called")
>>   pull(in) //request the next element on in port
>> }
>>   })
>> }
>>
>>
>> When we are resuming the valve my using the switch.open, we are pushing 
>> the element
>>
>> override def open: Unit = {
>>
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>>
>> The Current test is failing
>>
>> "A closed valve" should "emit only 3 elements after it has been open" in {
>>   
>> val (valve, probe) = Source(1 to 5)
>> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
>> default is closed, dont push any message
>> .toMat(TestSink.probe[Int])(Keep.both)
>> .run()
>>
>>   probe.request(2)
>>   probe.expectNoMsg()
>>
>>   valve.open //open the valve should emit the previous
>>
>>
>>   probe.expectNext shouldEqual 1 //we never receive the element
>>   probe.expectNext shouldEqual 2
>>
>>   probe.request(3)
>>   probe.expectNext shouldEqual 3
>>   probe.expectNext shouldEqual 4
>>   probe.expectNext shouldEqual 5
>>
>>   probe.expectComplete()
>> }
>>
>>
>> Here the console log
>>
>> on pull called
>> Closed on push called with 1
>> pushing Some(1), out is available ? true
>>
>> Expected OnNext(_), yet no element signaled during 3 seconds
>> java.lang.AssertionError: Expected OnNext(_), yet no element signaled 

Re: [akka-user] Re: Akka stream - implements Pause/Resume

2016-10-14 Thread Konrad Malawski
Have you read the blog post?
In the async callback you can push(), that's what I meant.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 14 October 2016 at 10:11:09, Konrad 'ktoso' Malawski (ktos...@gmail.com)
wrote:

Please read this:
- http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis
- and this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-
customize.html

Specifically, your trigger should be implemented as async-callback, as it
comes from the outside but should "wake up" the stage to be able to push
data again.
In your current setup it never "wakes up" since all pulls/pushes have been
processed - the stage has no idea it should do something once you called
open.

-- Konrad

W dniu piątek, 14 października 2016 09:03:54 UTC+2 użytkownik regis leray
napisał:
>
> Hi,
>
> Im currently trying to implement a valve Graph to manage pause/resume. We
> can control the behavior of the graph by using the MaterializeValue
>
> trait ValveSwitch {
>   def open: Unit
>   def close: Unit
> }
>
>
> Current implementation of the valve
>
> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>
>   override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))
>
>   override def createLogicAndMaterializedValue(inheritedAttributes: 
> Attributes): (GraphStageLogic, ValveSwitch) = {
> val logic = new ValveGraphStageLogic(shape, mode)
> (logic, logic.switch)
>   }
>
> }
>
>
> The current implementation is pretty simple, each time we are receiving a
> onPull demand we are requesting by doing pull(in).
> When a onPush demand is received we are checking the current state
> - if Open we are doing the default behavior by doing push(out,element)
> - if Close we are putting the element into a queue
>
> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends 
> GraphStageLogic(shape){
>   import shape._
>
>   var bufferedElement = List.empty[A]
>
>   val switch = new ValveSwitch {
> override def open: Unit = {
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = List.empty
> }
>
> override def close: Unit = {
>   mode = ValveMode.Closed
> }
>   }
>
>   setHandler(in, new InHandler {
> override def onPush(): Unit = {
>   val element = grab(in) //acquires the element that has been received 
> during an onPush
>   println(s"${mode} on push called with $element")
>   if (mode == ValveMode.Open) {
> push(out, element) //push directly the element on the out port
>   } else {
> bufferedElement = bufferedElement :+ element
>   }
> }
>   })
>
>   setHandler(out, new OutHandler {
> override def onPull(): Unit = {
>   println("on pull called")
>   pull(in) //request the next element on in port
> }
>   })
> }
>
>
> When we are resuming the valve my using the switch.open, we are pushing
> the element
>
> override def open: Unit = {
>
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = List.empty
> }
>
>
> The Current test is failing
>
> "A closed valve" should "emit only 3 elements after it has been open" in {
>
> val (valve, probe) = Source(1 to 5)
> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
> default is closed, dont push any message
> .toMat(TestSink.probe[Int])(Keep.both)
> .run()
>
>   probe.request(2)
>   probe.expectNoMsg()
>
>   valve.open //open the valve should emit the previous
>
>
>   probe.expectNext shouldEqual 1 //we never receive the element
>   probe.expectNext shouldEqual 2
>
>   probe.request(3)
>   probe.expectNext shouldEqual 3
>   probe.expectNext shouldEqual 4
>   probe.expectNext shouldEqual 5
>
>   probe.expectComplete()
> }
>
>
> Here the console log
>
> on pull called
> Closed on push called with 1
> pushing Some(1), out is available ? true
>
> Expected OnNext(_), yet no element signaled during 3 seconds
> java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 
> 3 seconds
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)
>
>
> I'm suspecting the current code to have an issue when we are resuming the
> valve, it doesnt seems the push really works
>
> val switch = new ValveSwitch {
> override def open: Unit = {
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? 
>