Re: [akka-user] Re: Akka stream - implements Pause/Resume

2016-10-14 Thread Viktor Klang
Hi Regis,

unfortunately you're still doing unsafe things (you have a race-condition
in your accesses to bufferedElement)

On Fri, Oct 14, 2016 at 3:56 PM, regis leray  wrote:

> Thanks a lot i succeed, to make it work...
>
> val switch = new ValveSwitch {
>   val callback = getAsyncCallback[A](push(out, _))
>
>   override def open: Unit = {
> mode = ValveMode.Open
> bufferedElement.foreach(callback.invoke)
> bufferedElement = Option.empty
>   }
>
>   override def close: Unit = {
> mode = ValveMode.Closed
>   }
> }
>
>
>
> The whole solution
> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>
>
> Le vendredi 14 octobre 2016 08:13:21 UTC-4, Konrad Malawski a écrit :
>>
>> Have you read the blog post?
>> In the async callback you can push(), that's what I meant.
>>
>> --
>> Konrad `ktoso` Malawski
>> Akka  @ Lightbend 
>>
>> On 14 October 2016 at 10:11:09, Konrad 'ktoso' Malawski (kto...@gmail.com)
>> wrote:
>>
>> Please read this:
>> - http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis
>> - and this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-c
>> ustomize.html
>>
>> Specifically, your trigger should be implemented as async-callback, as it
>> comes from the outside but should "wake up" the stage to be able to push
>> data again.
>> In your current setup it never "wakes up" since all pulls/pushes have
>> been processed - the stage has no idea it should do something once you
>> called open.
>>
>> -- Konrad
>>
>> W dniu piątek, 14 października 2016 09:03:54 UTC+2 użytkownik regis leray
>> napisał:
>>>
>>> Hi,
>>>
>>> Im currently trying to implement a valve Graph to manage pause/resume.
>>> We can control the behavior of the graph by using the MaterializeValue
>>>
>>> trait ValveSwitch {
>>>   def open: Unit
>>>   def close: Unit
>>> }
>>>
>>>
>>> Current implementation of the valve
>>>
>>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>>
>>>   override val shape = FlowShape(Inlet[A]("valve.in"), 
>>> Outlet[A]("valve.out"))
>>>
>>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>>> Attributes): (GraphStageLogic, ValveSwitch) = {
>>> val logic = new ValveGraphStageLogic(shape, mode)
>>> (logic, logic.switch)
>>>   }
>>>
>>> }
>>>
>>>
>>> The current implementation is pretty simple, each time we are receiving
>>> a onPull demand we are requesting by doing pull(in).
>>> When a onPush demand is received we are checking the current state
>>> - if Open we are doing the default behavior by doing push(out,element)
>>> - if Close we are putting the element into a queue
>>>
>>> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>>> extends GraphStageLogic(shape){
>>>   import shape._
>>>
>>>   var bufferedElement = List.empty[A]
>>>
>>>   val switch = new ValveSwitch {
>>> override def open: Unit = {
>>>   mode = ValveMode.Open
>>>   println(s"pushing $bufferedElement, out is available ? 
>>> ${isAvailable(out)}")
>>>
>>>   bufferedElement.foreach(push(out, _))
>>>   bufferedElement = List.empty
>>> }
>>>
>>> override def close: Unit = {
>>>   mode = ValveMode.Closed
>>> }
>>>   }
>>>
>>>   setHandler(in, new InHandler {
>>> override def onPush(): Unit = {
>>>   val element = grab(in) //acquires the element that has been received 
>>> during an onPush
>>>   println(s"${mode} on push called with $element")
>>>   if (mode == ValveMode.Open) {
>>> push(out, element) //push directly the element on the out port
>>>   } else {
>>> bufferedElement = bufferedElement :+ element
>>>   }
>>> }
>>>   })
>>>
>>>   setHandler(out, new OutHandler {
>>> override def onPull(): Unit = {
>>>   println("on pull called")
>>>   pull(in) //request the next element on in port
>>> }
>>>   })
>>> }
>>>
>>>
>>> When we are resuming the valve my using the switch.open, we are pushing
>>> the element
>>>
>>> override def open: Unit = {
>>>
>>>   mode = ValveMode.Open
>>>   println(s"pushing $bufferedElement, out is available ? 
>>> ${isAvailable(out)}")
>>>
>>>   bufferedElement.foreach(push(out, _))
>>>   bufferedElement = List.empty
>>> }
>>>
>>>
>>> The Current test is failing
>>>
>>> "A closed valve" should "emit only 3 elements after it has been open" in {
>>>
>>> val (valve, probe) = Source(1 to 5)
>>> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
>>> default is closed, dont push any message
>>> .toMat(TestSink.probe[Int])(Keep.both)
>>> .run()
>>>
>>>   probe.request(2)
>>>   probe.expectNoMsg()
>>>
>>>   valve.open //open the valve should emit the previous
>>>
>>>
>>>   probe.expectNext shouldEqual 1 //we never receive the element
>>>   probe.expectNext shouldEqual 2
>>>
>>>   probe.request(3)
>>>   probe.expectNext shouldEqual 3
>>>   

[akka-user] Issue regarding AddressFromURIString and DNS hostnames

2016-10-14 Thread 'Carl Pulley' via Akka User List
The following code generates a java.net.MalformedURLException:

  AddressFromURIString("akka.tcp://Example@node_1:2552”)

Looking at the underlying implementation of AddressFromURIString, the exception 
derives from the use of java.net.URI to parse this string. Which in turn, 
correctly throws an exception according to RFC 2396 (because URI hosts may not 
contain underscore characters).

However, it is currently perfectly permissible to create an instance of 
akka.actor.Address using a DNS hostname (which may contain underscores).

So, at first blush, it would appear that we need something like 
AddressFromDNSString (or similar)?

For myself, I’m hitting this problem whilst building Akka clusters using 
docker-compose with the scale command. In this use case, I have to expect the 
presence of underscores since Docker libnetworking defines Docker containers 
based on DNS names.

Any help or advice here greatly appreciated.

Many thanks,

  Carl.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Implementation of Persistent Actor with initial state

2016-10-14 Thread Jakub Liska
Hey,

what is the best practice to do in this hypothetical scenario : 
1) Say you have a time series pipeline that started at 2014 and created 
persistent state on S3 and other DB systems
2) You can introspect these storages and know what partitions already 
exists in all of them
3) The persistent actor's job would be scheduling tasks for newly added 
partitions (next minute/hour/day, etc.)
3) Now at 2016 you deploy a persistent Actor that will hold state about 
completness (existing partitions) of these storages and keep up with their 
progress

The way I see it, when this persistent Actor boots up, it will have 2 
choices :
a) either it starts for the very first time, it will have to replay all 
DomainEvents for all historical partitions from 2014 to itself to 
initialize it's state to the current view of the world in 2016 
b) or it restarts or crashes and its state is replayed from the journal 
implicitly

Now I cannot find any reference of how this should be done. The only 
solution that comes to mind is to use persistence-query and obtaining 
Journal : 

   readJournal.currentPersistenceIds()
   readJournal.eventsByPersistenceId("user-us-1337")
   1. 
   And if the result is empty, then it will reconstructs the history.

Is this a way to go?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka stream - implements Pause/Resume

2016-10-14 Thread regis leray
Thanks a lot i succeed, to make it work...

val switch = new ValveSwitch {
  val callback = getAsyncCallback[A](push(out, _))

  override def open: Unit = {
mode = ValveMode.Open
bufferedElement.foreach(callback.invoke)
bufferedElement = Option.empty
  }

  override def close: Unit = {
mode = ValveMode.Closed
  }
}



The whole solution
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938


Le vendredi 14 octobre 2016 08:13:21 UTC-4, Konrad Malawski a écrit :
>
> Have you read the blog post?
> In the async callback you can push(), that's what I meant.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 14 October 2016 at 10:11:09, Konrad 'ktoso' Malawski (kto...@gmail.com 
> ) wrote:
>
> Please read this: 
> - http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis 
> - and this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream
> -customize.html
>
> Specifically, your trigger should be implemented as async-callback, as it 
> comes from the outside but should "wake up" the stage to be able to push 
> data again.
> In your current setup it never "wakes up" since all pulls/pushes have been 
> processed - the stage has no idea it should do something once you called 
> open.
>
> -- Konrad
>
> W dniu piątek, 14 października 2016 09:03:54 UTC+2 użytkownik regis leray 
> napisał: 
>>
>> Hi, 
>>
>> Im currently trying to implement a valve Graph to manage pause/resume. We 
>> can control the behavior of the graph by using the MaterializeValue
>>
>> trait ValveSwitch {
>>   def open: Unit
>>   def close: Unit
>> }
>>
>>
>> Current implementation of the valve
>>
>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>
>>   override val shape = FlowShape(Inlet[A]("valve.in"), 
>> Outlet[A]("valve.out"))
>>
>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>> Attributes): (GraphStageLogic, ValveSwitch) = {
>> val logic = new ValveGraphStageLogic(shape, mode)
>> (logic, logic.switch)
>>   }
>>
>> }
>>
>>
>> The current implementation is pretty simple, each time we are receiving a 
>> onPull demand we are requesting by doing pull(in).
>> When a onPush demand is received we are checking the current state 
>> - if Open we are doing the default behavior by doing push(out,element)
>> - if Close we are putting the element into a queue
>>
>> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>> extends GraphStageLogic(shape){
>>   import shape._
>>
>>   var bufferedElement = List.empty[A]
>>
>>   val switch = new ValveSwitch {
>> override def open: Unit = {
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>> override def close: Unit = {
>>   mode = ValveMode.Closed
>> }
>>   }
>>
>>   setHandler(in, new InHandler {
>> override def onPush(): Unit = {
>>   val element = grab(in) //acquires the element that has been received 
>> during an onPush
>>   println(s"${mode} on push called with $element")
>>   if (mode == ValveMode.Open) {
>> push(out, element) //push directly the element on the out port
>>   } else {
>> bufferedElement = bufferedElement :+ element
>>   }
>> }
>>   })
>>
>>   setHandler(out, new OutHandler {
>> override def onPull(): Unit = {
>>   println("on pull called")
>>   pull(in) //request the next element on in port
>> }
>>   })
>> }
>>
>>
>> When we are resuming the valve my using the switch.open, we are pushing 
>> the element
>>
>> override def open: Unit = {
>>
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>>
>> The Current test is failing
>>
>> "A closed valve" should "emit only 3 elements after it has been open" in {
>>   
>> val (valve, probe) = Source(1 to 5)
>> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
>> default is closed, dont push any message
>> .toMat(TestSink.probe[Int])(Keep.both)
>> .run()
>>
>>   probe.request(2)
>>   probe.expectNoMsg()
>>
>>   valve.open //open the valve should emit the previous
>>
>>
>>   probe.expectNext shouldEqual 1 //we never receive the element
>>   probe.expectNext shouldEqual 2
>>
>>   probe.request(3)
>>   probe.expectNext shouldEqual 3
>>   probe.expectNext shouldEqual 4
>>   probe.expectNext shouldEqual 5
>>
>>   probe.expectComplete()
>> }
>>
>>
>> Here the console log
>>
>> on pull called
>> Closed on push called with 1
>> pushing Some(1), out is available ? true
>>
>> Expected OnNext(_), yet no element signaled during 3 seconds
>> java.lang.AssertionError: Expected OnNext(_), yet no element signaled 

Re: [akka-user] Re: Akka stream - implements Pause/Resume

2016-10-14 Thread Konrad Malawski
Have you read the blog post?
In the async callback you can push(), that's what I meant.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 14 October 2016 at 10:11:09, Konrad 'ktoso' Malawski (ktos...@gmail.com)
wrote:

Please read this:
- http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis
- and this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-
customize.html

Specifically, your trigger should be implemented as async-callback, as it
comes from the outside but should "wake up" the stage to be able to push
data again.
In your current setup it never "wakes up" since all pulls/pushes have been
processed - the stage has no idea it should do something once you called
open.

-- Konrad

W dniu piątek, 14 października 2016 09:03:54 UTC+2 użytkownik regis leray
napisał:
>
> Hi,
>
> Im currently trying to implement a valve Graph to manage pause/resume. We
> can control the behavior of the graph by using the MaterializeValue
>
> trait ValveSwitch {
>   def open: Unit
>   def close: Unit
> }
>
>
> Current implementation of the valve
>
> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>
>   override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))
>
>   override def createLogicAndMaterializedValue(inheritedAttributes: 
> Attributes): (GraphStageLogic, ValveSwitch) = {
> val logic = new ValveGraphStageLogic(shape, mode)
> (logic, logic.switch)
>   }
>
> }
>
>
> The current implementation is pretty simple, each time we are receiving a
> onPull demand we are requesting by doing pull(in).
> When a onPush demand is received we are checking the current state
> - if Open we are doing the default behavior by doing push(out,element)
> - if Close we are putting the element into a queue
>
> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends 
> GraphStageLogic(shape){
>   import shape._
>
>   var bufferedElement = List.empty[A]
>
>   val switch = new ValveSwitch {
> override def open: Unit = {
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = List.empty
> }
>
> override def close: Unit = {
>   mode = ValveMode.Closed
> }
>   }
>
>   setHandler(in, new InHandler {
> override def onPush(): Unit = {
>   val element = grab(in) //acquires the element that has been received 
> during an onPush
>   println(s"${mode} on push called with $element")
>   if (mode == ValveMode.Open) {
> push(out, element) //push directly the element on the out port
>   } else {
> bufferedElement = bufferedElement :+ element
>   }
> }
>   })
>
>   setHandler(out, new OutHandler {
> override def onPull(): Unit = {
>   println("on pull called")
>   pull(in) //request the next element on in port
> }
>   })
> }
>
>
> When we are resuming the valve my using the switch.open, we are pushing
> the element
>
> override def open: Unit = {
>
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = List.empty
> }
>
>
> The Current test is failing
>
> "A closed valve" should "emit only 3 elements after it has been open" in {
>
> val (valve, probe) = Source(1 to 5)
> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
> default is closed, dont push any message
> .toMat(TestSink.probe[Int])(Keep.both)
> .run()
>
>   probe.request(2)
>   probe.expectNoMsg()
>
>   valve.open //open the valve should emit the previous
>
>
>   probe.expectNext shouldEqual 1 //we never receive the element
>   probe.expectNext shouldEqual 2
>
>   probe.request(3)
>   probe.expectNext shouldEqual 3
>   probe.expectNext shouldEqual 4
>   probe.expectNext shouldEqual 5
>
>   probe.expectComplete()
> }
>
>
> Here the console log
>
> on pull called
> Closed on push called with 1
> pushing Some(1), out is available ? true
>
> Expected OnNext(_), yet no element signaled during 3 seconds
> java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 
> 3 seconds
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)
>
>
> I'm suspecting the current code to have an issue when we are resuming the
> valve, it doesnt seems the push really works
>
> val switch = new ValveSwitch {
> override def open: Unit = {
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? 
> 

Re: [akka-user] Akka stream - implements Pause/Resume

2016-10-14 Thread regis leray
Thanks a lot for your informations (and sorry for the double post).
Can you please tell me what is the way to "wake up the stage", it is not 
really obvious how to make that happen.

Thanks

Le vendredi 14 octobre 2016 04:09:12 UTC-4, Konrad Malawski a écrit :
>
> Please read this:
> - http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis 
> - and this: 
> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html
>
> Specifically, your trigger should be implemented as async-callback, as it 
> comes from the outside but should "wake up" the stage to be able to push 
> data again.
> In your current setup it never "wakes up" since all pulls/pushes have been 
> processed - the stage has no idea it should do something once you called 
> open.
>
> On Fri, Oct 14, 2016 at 12:13 AM, regis leray  > wrote:
>
>> Hi,
>>
>> Im currently trying to implement a valve Graph to manage pause/resume. We 
>> can control the behavior of the graph by using the MaterializeValue
>>
>> trait ValveSwitch {
>>   def open: Unit
>>   def close: Unit
>> }
>>
>>
>> Current implementation of the valve
>>
>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>
>>   override val shape = FlowShape(Inlet[A]("valve.in"), 
>> Outlet[A]("valve.out"))
>>
>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>> Attributes): (GraphStageLogic, ValveSwitch) = {
>> val logic = new ValveGraphStageLogic(shape, mode)
>> (logic, logic.switch)
>>   }
>>
>> }
>>
>>
>> The current implementation is pretty simple, each time we are receiving a 
>> onPull demand we are requesting by doing pull(in).
>> When a onPush demand is received we are checking the current state 
>> - if Open we are doing the default behavior by doing push(out,element)
>> - if Close we are putting the element into a queue
>>
>> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>> extends GraphStageLogic(shape){
>>   import shape._
>>
>>   var bufferedElement = List.empty[A]
>>
>>   val switch = new ValveSwitch {
>> override def open: Unit = {
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>> override def close: Unit = {
>>   mode = ValveMode.Closed
>> }
>>   }
>>
>>   setHandler(in, new InHandler {
>> override def onPush(): Unit = {
>>   val element = grab(in) //acquires the element that has been received 
>> during an onPush
>>   println(s"${mode} on push called with $element")
>>   if (mode == ValveMode.Open) {
>> push(out, element) //push directly the element on the out port
>>   } else {
>> bufferedElement = bufferedElement :+ element
>>   }
>> }
>>   })
>>
>>   setHandler(out, new OutHandler {
>> override def onPull(): Unit = {
>>   println("on pull called")
>>   pull(in) //request the next element on in port
>> }
>>   })
>> }
>>
>>
>> When we are resuming the valve my using the switch.open, we are pushing 
>> the element
>>
>> override def open: Unit = {
>>
>>   mode = ValveMode.Open
>>   println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>>
>>   bufferedElement.foreach(push(out, _))
>>   bufferedElement = List.empty
>> }
>>
>>
>> The Current test is failing
>>
>> "A closed valve" should "emit only 3 elements after it has been open" in {
>>   
>> val (valve, probe) = Source(1 to 5)
>> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
>> default is closed, dont push any message
>> .toMat(TestSink.probe[Int])(Keep.both)
>> .run()
>>
>>   probe.request(2)
>>   probe.expectNoMsg()
>>
>>   valve.open //open the valve should emit the previous
>>
>>
>>   probe.expectNext shouldEqual 1 //we never receive the element
>>   probe.expectNext shouldEqual 2
>>
>>   probe.request(3)
>>   probe.expectNext shouldEqual 3
>>   probe.expectNext shouldEqual 4
>>   probe.expectNext shouldEqual 5
>>
>>   probe.expectComplete()
>> }
>>
>>
>> Here the console log
>>
>> on pull called
>> Closed on push called with 1
>> pushing Some(1), out is available ? true
>>
>> Expected OnNext(_), yet no element signaled during 3 seconds
>> java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 
>> 3 seconds
>> at 
>> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
>> at 
>> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
>> at 
>> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
>> at 
>> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)
>>
>>
>> I'm suspecting the current code to have an issue when we are resuming the 
>> valve, it doesnt seems the push really works
>>
>> val 

Re: [akka-user] Re: Replaying journal events without actually recovering Actors?

2016-10-14 Thread Akka Team
Yup, that's one way to do it - cheers!

On Thu, Oct 13, 2016 at 10:07 PM, Spencer Judge 
wrote:

> I can answered my own question here. It pretty much just came down to my
> lack of knowledge around the streams API, which I hadn't used yet. Changing
> the test to this works perfectly:
>
> test("test journal backwards compatibility", EndToEndTest) {
>   val readJournal = 
> PersistenceQuery(system).readJournalFor[LeveldbReadJournal](
> LeveldbReadJournal.Identifier)
>   implicit val mat = ActorMaterializer()(system)
>   val allPersistenceIDs = readJournal.currentPersistenceIds().map { pid =>
> val eventsForID = readJournal.currentEventsByPersistenceId(pid)
> val res = eventsForID.runWith(Sink.foreach { e => e.event })
> // Simply waiting for the result will blow up with an exception if we 
> couldn't deserialize.
> Await.result(res, 300.millis)
>   }
>   Await.result(allPersistenceIDs.runWith(Sink.seq), 5.seconds)
> }
>
>
>
> On Wednesday, October 12, 2016 at 4:43:39 PM UTC-7, Spencer Judge wrote:
>>
>> Hello all,
>>
>> In order to test that no unaccounted-for serialization changes have
>> happen before we deploy our service, I need some way for us to *attempt* to
>> deserialize all currently stored events in our event journal and snapshots.
>>
>> Currently, we just use the local leveldb plugin with local snapshot
>> storage, which has been totally adequate for our purpose.
>>
>> I'm trying to do this by using a PersistenceQuery, but I'm totally unable
>> to get the persistence query to actually... return anything. My code is
>> pasted below. The things hidden behind the "PersistentTestKit" object just
>> handle some data cleaning before the tests, etc, and when I call
>> getNewConfig, that's just modifying the akka config to use the location of
>> the stored journal/snapshots we want to test against.
>>
>> As you can see, I added a simple persistent actor just to test if I could
>> get the query to return events from something not even related to the
>> existing data, and even it doesn't show up in the query (nothing does).
>> This is the output:
>>
>> Recover persisted a thing!  // As many times as I've already run the
>> test, as expected.
>> Recover RecoveryCompleted
>> Persisting a thing!
>> Found persistence IDs:
>> Found persistence IDs:
>> Found persistence IDs:
>>
>> Clearly, I'm missing something, but I'm not sure what. If I inspect the
>> config given to the system, everything looks right. The storage locations
>> are right, the plugins are right, etc. Can someone point me in the right
>> direction?
>>
>> Alternatively, is there a better way to do this? I will happily do
>> something else entirely if PersistenceQuery isn't the right fit.
>>
>>
>> object PersistCompatIT {
>>   val persistBackupLoc = ".exported_state"
>>   val dumpFile = new File(PersistentTestKit.findRoot(), 
>> persistBackupLoc).getAbsoluteFile
>>   val absPathToBackup = dumpFile.toPath.toString
>>   val systemConfig = PersistentTestKit.getNewConfig(true, (cfg) => {
>> cfg.withValue("akka.persistence.snapshot-store.local.dir",
>>   ConfigValueFactory
>> 
>> .fromAnyRef(s"$absPathToBackup/boxsrv-snapshots/local-snapshots"))
>>   .withValue("akka.persistence.journal.leveldb.dir",
>>  
>> ConfigValueFactory.fromAnyRef(s"$absPathToBackup/boxsrv-journal/leveldb-journal"))
>>   })
>>
>>   val lamePersistActorID = "imastupidpersistactor"
>>   class LamePersistActor extends PersistentActor {
>> override def receiveRecover: Receive = {
>>   case m: Any => println(s"Recover $m")
>> }
>> override def receiveCommand: Receive = {
>>   case 'whoapersist =>
>> println("Persisting a thing!")
>> persist("persisted a thing!") { _ => () }
>>   case m: Any => println(s"Command $m")
>> }
>> override def persistenceId: String = lamePersistActorID
>>   }
>> }
>>
>> /**
>>   * Tests backwards compatibility for journaling / snapshots
>>   */
>> class PersistCompatIT extends TestKit(ActorSystem("PersistCompatSys",
>>   
>> PersistCompatIT.systemConfig)) with FunSuiteLike {
>>
>>   test("test journal backwards compatibility", EndToEndTest) {
>> val lameActor = system.actorOf(Props(new LamePersistActor))
>> lameActor ! 'whoapersist
>> Thread.sleep(1000)
>> val readJournal = 
>> PersistenceQuery(system).readJournalFor[LeveldbReadJournal](
>>   LeveldbReadJournal.Identifier)
>> implicit val mat = ActorMaterializer()(system)
>> for (i <- 1 to 3) {
>>   Thread.sleep(2000)
>>   val allPersistenceIDs = readJournal.allPersistenceIds()
>>   val dafuq = 
>> readJournal.eventsByPersistenceId(PersistCompatIT.lamePersistActorID)
>>   println("Found persistence IDs:")
>>   allPersistenceIDs.map { e =>
>> println(e)
>> e
>>   }
>>   dafuq.map { e =>
>> println(e.event)
>> e
>>   }
>>  

[akka-user] Re: Akka stream - implements Pause/Resume

2016-10-14 Thread Konrad 'ktoso' Malawski
Please read this:
- http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis 
- and this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream
-customize.html

Specifically, your trigger should be implemented as async-callback, as it 
comes from the outside but should "wake up" the stage to be able to push 
data again.
In your current setup it never "wakes up" since all pulls/pushes have been 
processed - the stage has no idea it should do something once you called 
open.

-- Konrad

W dniu piątek, 14 października 2016 09:03:54 UTC+2 użytkownik regis leray 
napisał:
>
> Hi,
>
> Im currently trying to implement a valve Graph to manage pause/resume. We 
> can control the behavior of the graph by using the MaterializeValue
>
> trait ValveSwitch {
>   def open: Unit
>   def close: Unit
> }
>
>
> Current implementation of the valve
>
> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>
>   override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))
>
>   override def createLogicAndMaterializedValue(inheritedAttributes: 
> Attributes): (GraphStageLogic, ValveSwitch) = {
> val logic = new ValveGraphStageLogic(shape, mode)
> (logic, logic.switch)
>   }
>
> }
>
>
> The current implementation is pretty simple, each time we are receiving a 
> onPull demand we are requesting by doing pull(in).
> When a onPush demand is received we are checking the current state 
> - if Open we are doing the default behavior by doing push(out,element)
> - if Close we are putting the element into a queue
>
> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends 
> GraphStageLogic(shape){
>   import shape._
>
>   var bufferedElement = List.empty[A]
>
>   val switch = new ValveSwitch {
> override def open: Unit = {
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = List.empty
> }
>
> override def close: Unit = {
>   mode = ValveMode.Closed
> }
>   }
>
>   setHandler(in, new InHandler {
> override def onPush(): Unit = {
>   val element = grab(in) //acquires the element that has been received 
> during an onPush
>   println(s"${mode} on push called with $element")
>   if (mode == ValveMode.Open) {
> push(out, element) //push directly the element on the out port
>   } else {
> bufferedElement = bufferedElement :+ element
>   }
> }
>   })
>
>   setHandler(out, new OutHandler {
> override def onPull(): Unit = {
>   println("on pull called")
>   pull(in) //request the next element on in port
> }
>   })
> }
>
>
> When we are resuming the valve my using the switch.open, we are pushing 
> the element
>
> override def open: Unit = {
>
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = List.empty
> }
>
>
> The Current test is failing
>
> "A closed valve" should "emit only 3 elements after it has been open" in {
>   
> val (valve, probe) = Source(1 to 5)
> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
> default is closed, dont push any message
> .toMat(TestSink.probe[Int])(Keep.both)
> .run()
>
>   probe.request(2)
>   probe.expectNoMsg()
>
>   valve.open //open the valve should emit the previous
>
>
>   probe.expectNext shouldEqual 1 //we never receive the element
>   probe.expectNext shouldEqual 2
>
>   probe.request(3)
>   probe.expectNext shouldEqual 3
>   probe.expectNext shouldEqual 4
>   probe.expectNext shouldEqual 5
>
>   probe.expectComplete()
> }
>
>
> Here the console log
>
> on pull called
> Closed on push called with 1
> pushing Some(1), out is available ? true
>
> Expected OnNext(_), yet no element signaled during 3 seconds
> java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 
> 3 seconds
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)
>
>
> I'm suspecting the current code to have an issue when we are resuming the 
> valve, it doesnt seems the push really works
>
> val switch = new ValveSwitch {
> override def open: Unit = {
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = Option.empty
> }
>
> override def close: Unit = {
>   mode = ValveMode.Closed
> }
>   }
>
>
> There is definitively something i dont catch up, if anyone 

[akka-user] Re: Akka stream - Flow in Pause

2016-10-14 Thread Konrad 'ktoso' Malawski
Seems like double posted the same question?
Please don't double post the same question :-)

I answered in that 
thread: https://groups.google.com/forum/#!topic/akka-user/7VqlP1w_CN8

-- 
Konrad
Akka Team

W dniu piątek, 14 października 2016 09:02:59 UTC+2 użytkownik regis leray 
napisał:
>
> I'm trying to implement the pause/resume in akka stream.
> The current implementation, is buffering all elements into an List if the 
> valve is in mode "close", if not we are pushing elements like the default 
> behavior.
>
> When we are resuming the flow by calling the materialize value 
> switch.open, we change the mode to open and pushing all buffered elements.
> Currently im suspecting this code to be the problem
>
> val switch = new ValveSwitch {
>   override def open: Unit = {
> mode = ValveMode.Open
> println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
> bufferedElement.foreach(push(out, _))
> bufferedElement = Option.empty
>   }
>
> ...
>
> }
>
>
> In my unit test when the switch is changing close to open, the Sink never 
> receive the elements
>
> "A closed valve" should "emit only 3 elements after it has been open" in {
>   val (valve, probe) = Source(1 to 5)
> .viaMat(new Valve(ValveMode.Closed))(Keep.right)
> .toMat(TestSink.probe[Int])(Keep.both)
> .run()
>
>   probe.request(2)
>   probe.expectNoMsg()
>
>   valve.open // we are pushing the buffered elements
>   probe.expectNext shouldEqual 1 // this assert is failing !
>   probe.expectNext shouldEqual 2
>
> }
>
>
> Any help would be really appreciated
>
> Here
> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>
>
> Le jeudi 13 octobre 2016 12:52:58 UTC-4, regis leray a écrit :
>>
>> Hi, 
>>
>> I'm trying to implements a way to control a flow (start/stop), nothing 
>> was implemented yet in the core of akka-stream
>>
>> My current implementation looks like this.
>>
>> trait ValveSwitch {
>>   def open: Unit
>>   def close: Unit
>> }
>>
>> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
>> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>>
>>   override val shape = FlowShape(Inlet[A]("valve.in"), 
>> Outlet[A]("valve.out"))
>>
>>   override def createLogicAndMaterializedValue(inheritedAttributes: 
>> Attributes): (GraphStageLogic, ValveSwitch) = {
>> val logic = new ValveGraphStageLogic(shape, mode)
>> (logic, logic.switch)
>>   }
>>
>>   private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
>> extends GraphStageLogic(shape){
>> import shape._
>>
>> var bufferedElement = List.empty[A]
>>
>> val switch = new ValveSwitch {
>>   override def open: Unit = {
>> mode = ValveMode.Open
>> println(s"pushing $bufferedElement, out is available ? 
>> ${isAvailable(out)}")
>> bufferedElement.foreach(push(out, _))
>> bufferedElement = List.empty
>>   }
>>
>>   override def close: Unit = {
>> mode = ValveMode.Closed
>>   }
>> }
>>
>> setHandler(in, new InHandler {
>>   override def onPush(): Unit = {
>> val element = grab(in) //acquires the element that has been 
>> received during an onPush
>> println(s"${mode} on push called with $element")
>> if (mode == ValveMode.Open) {
>>   push(out, element) //push directly the element on the out port
>> } else {
>>   bufferedElement = bufferedElement :+ element
>> }
>>   }
>> })
>>
>> setHandler(out, new OutHandler {
>>   override def onPull(): Unit = {
>> println("on pull called")
>> pull(in) //request the next element on in port
>>   }
>> })
>>   }
>> }
>>
>> trait ValveMode
>>
>> object ValveMode {
>>   case object Open extends ValveMode
>>   case object Closed extends ValveMode
>> }
>>
>> 
>>
>> My current unit test is failing. due to the fact when i open the valve, i 
>> never received the previous message.
>> It seems even if i push the element through ( valve.open ) the sink 
>> never receive the element
>>
>> class ValveSpec extends FlatSpec {
>>
>>   implicit val system = ActorSystem()
>>   implicit val materializer = ActorMaterializer()
>>   implicit val executionContext = materializer.executionContext
>>
>>
>>   "A closed valve" should "emit only 3 elements after it has been open" 
>> in {
>> val (valve, probe) = Source(1 to 3)
>>   .viaMat(new Valve(ValveMode.Closed))(Keep.right)
>>   .toMat(TestSink.probe[Int])(Keep.both)
>>   .run()
>>
>> probe.request(1)
>> probe.expectNoMsg()
>>
>> valve.open
>> probe.expectNext(1)
>>
>> probe.request(2)
>> probe.expectNext(2, 3)
>>
>> probe.expectComplete()
>>   }
>> }
>>
>>
>> Here the gist 
>> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> 

Re: [akka-user] Akka stream - implements Pause/Resume

2016-10-14 Thread Konrad Malawski
Please read this:
- http://blog.akka.io/integrations/2016/08/29/connecting-existing-apis
- and this:
http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html

Specifically, your trigger should be implemented as async-callback, as it
comes from the outside but should "wake up" the stage to be able to push
data again.
In your current setup it never "wakes up" since all pulls/pushes have been
processed - the stage has no idea it should do something once you called
open.

On Fri, Oct 14, 2016 at 12:13 AM, regis leray  wrote:

> Hi,
>
> Im currently trying to implement a valve Graph to manage pause/resume. We
> can control the behavior of the graph by using the MaterializeValue
>
> trait ValveSwitch {
>   def open: Unit
>   def close: Unit
> }
>
>
> Current implementation of the valve
>
> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>
>   override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))
>
>   override def createLogicAndMaterializedValue(inheritedAttributes: 
> Attributes): (GraphStageLogic, ValveSwitch) = {
> val logic = new ValveGraphStageLogic(shape, mode)
> (logic, logic.switch)
>   }
>
> }
>
>
> The current implementation is pretty simple, each time we are receiving a
> onPull demand we are requesting by doing pull(in).
> When a onPush demand is received we are checking the current state
> - if Open we are doing the default behavior by doing push(out,element)
> - if Close we are putting the element into a queue
>
> private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends 
> GraphStageLogic(shape){
>   import shape._
>
>   var bufferedElement = List.empty[A]
>
>   val switch = new ValveSwitch {
> override def open: Unit = {
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = List.empty
> }
>
> override def close: Unit = {
>   mode = ValveMode.Closed
> }
>   }
>
>   setHandler(in, new InHandler {
> override def onPush(): Unit = {
>   val element = grab(in) //acquires the element that has been received 
> during an onPush
>   println(s"${mode} on push called with $element")
>   if (mode == ValveMode.Open) {
> push(out, element) //push directly the element on the out port
>   } else {
> bufferedElement = bufferedElement :+ element
>   }
> }
>   })
>
>   setHandler(out, new OutHandler {
> override def onPull(): Unit = {
>   println("on pull called")
>   pull(in) //request the next element on in port
> }
>   })
> }
>
>
> When we are resuming the valve my using the switch.open, we are pushing
> the element
>
> override def open: Unit = {
>
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = List.empty
> }
>
>
> The Current test is failing
>
> "A closed valve" should "emit only 3 elements after it has been open" in {
>
> val (valve, probe) = Source(1 to 5)
> .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
> default is closed, dont push any message
> .toMat(TestSink.probe[Int])(Keep.both)
> .run()
>
>   probe.request(2)
>   probe.expectNoMsg()
>
>   valve.open //open the valve should emit the previous
>
>
>   probe.expectNext shouldEqual 1 //we never receive the element
>   probe.expectNext shouldEqual 2
>
>   probe.request(3)
>   probe.expectNext shouldEqual 3
>   probe.expectNext shouldEqual 4
>   probe.expectNext shouldEqual 5
>
>   probe.expectComplete()
> }
>
>
> Here the console log
>
> on pull called
> Closed on push called with 1
> pushing Some(1), out is available ? true
>
> Expected OnNext(_), yet no element signaled during 3 seconds
> java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 
> 3 seconds
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
> at 
> akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
> at 
> com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)
>
>
> I'm suspecting the current code to have an issue when we are resuming the
> valve, it doesnt seems the push really works
>
> val switch = new ValveSwitch {
> override def open: Unit = {
>   mode = ValveMode.Open
>   println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
>
>   bufferedElement.foreach(push(out, _))
>   bufferedElement = Option.empty
> }
>
> override def close: Unit = {
>   mode = ValveMode.Closed
> }
>   }
>
>
> There is definitively something i dont catch up, if anyone could help me
> to see some 

[akka-user] Akka stream - implements Pause/Resume

2016-10-14 Thread regis leray
Hi,

Im currently trying to implement a valve Graph to manage pause/resume. We 
can control the behavior of the graph by using the MaterializeValue

trait ValveSwitch {
  def open: Unit
  def close: Unit
}


Current implementation of the valve

class Valve[A](mode: ValveMode = ValveMode.Open) extends 
GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {

  override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))

  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, ValveSwitch) = {
val logic = new ValveGraphStageLogic(shape, mode)
(logic, logic.switch)
  }

}


The current implementation is pretty simple, each time we are receiving a 
onPull demand we are requesting by doing pull(in).
When a onPush demand is received we are checking the current state 
- if Open we are doing the default behavior by doing push(out,element)
- if Close we are putting the element into a queue

private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends 
GraphStageLogic(shape){
  import shape._

  var bufferedElement = List.empty[A]

  val switch = new ValveSwitch {
override def open: Unit = {
  mode = ValveMode.Open
  println(s"pushing $bufferedElement, out is available ? 
${isAvailable(out)}")

  bufferedElement.foreach(push(out, _))
  bufferedElement = List.empty
}

override def close: Unit = {
  mode = ValveMode.Closed
}
  }

  setHandler(in, new InHandler {
override def onPush(): Unit = {
  val element = grab(in) //acquires the element that has been received 
during an onPush
  println(s"${mode} on push called with $element")
  if (mode == ValveMode.Open) {
push(out, element) //push directly the element on the out port
  } else {
bufferedElement = bufferedElement :+ element
  }
}
  })

  setHandler(out, new OutHandler {
override def onPull(): Unit = {
  println("on pull called")
  pull(in) //request the next element on in port
}
  })
}


When we are resuming the valve my using the switch.open, we are pushing the 
element

override def open: Unit = {

  mode = ValveMode.Open
  println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")

  bufferedElement.foreach(push(out, _))
  bufferedElement = List.empty
}


The Current test is failing

"A closed valve" should "emit only 3 elements after it has been open" in {
  
val (valve, probe) = Source(1 to 5)
.viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
default is closed, dont push any message
.toMat(TestSink.probe[Int])(Keep.both)
.run()

  probe.request(2)
  probe.expectNoMsg()

  valve.open //open the valve should emit the previous


  probe.expectNext shouldEqual 1 //we never receive the element
  probe.expectNext shouldEqual 2

  probe.request(3)
  probe.expectNext shouldEqual 3
  probe.expectNext shouldEqual 4
  probe.expectNext shouldEqual 5

  probe.expectComplete()
}


Here the console log

on pull called
Closed on push called with 1
pushing Some(1), out is available ? true

Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 
seconds
at 
akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
at 
akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
at 
com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
at 
com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)


I'm suspecting the current code to have an issue when we are resuming the 
valve, it doesnt seems the push really works

val switch = new ValveSwitch {
override def open: Unit = {
  mode = ValveMode.Open
  println(s"pushing $bufferedElement, out is available ? 
${isAvailable(out)}")

  bufferedElement.foreach(push(out, _))
  bufferedElement = Option.empty
}

override def close: Unit = {
  mode = ValveMode.Closed
}
  }


There is definitively something i dont catch up, if anyone could help me to 
see some light

Here the gist 
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938

Any help would be appreciated

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka stream - Flow in Pause

2016-10-14 Thread regis leray
I'm trying to implement the pause/resume in akka stream.
The current implementation, is buffering all elements into an List if the 
valve is in mode "close", if not we are pushing elements like the default 
behavior.

When we are resuming the flow by calling the materialize value switch.open, 
we change the mode to open and pushing all buffered elements.
Currently im suspecting this code to be the problem

val switch = new ValveSwitch {
  override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")

bufferedElement.foreach(push(out, _))
bufferedElement = Option.empty
  }

...

}


In my unit test when the switch is changing close to open, the Sink never 
receive the elements

"A closed valve" should "emit only 3 elements after it has been open" in {
  val (valve, probe) = Source(1 to 5)
.viaMat(new Valve(ValveMode.Closed))(Keep.right)
.toMat(TestSink.probe[Int])(Keep.both)
.run()

  probe.request(2)
  probe.expectNoMsg()

  valve.open // we are pushing the buffered elements
  probe.expectNext shouldEqual 1 // this assert is failing !
  probe.expectNext shouldEqual 2

}


Any help would be really appreciated

Here
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938


Le jeudi 13 octobre 2016 12:52:58 UTC-4, regis leray a écrit :
>
> Hi, 
>
> I'm trying to implements a way to control a flow (start/stop), nothing was 
> implemented yet in the core of akka-stream
>
> My current implementation looks like this.
>
> trait ValveSwitch {
>   def open: Unit
>   def close: Unit
> }
>
> class Valve[A](mode: ValveMode = ValveMode.Open) extends 
> GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
>
>   override val shape = FlowShape(Inlet[A]("valve.in"), 
> Outlet[A]("valve.out"))
>
>   override def createLogicAndMaterializedValue(inheritedAttributes: 
> Attributes): (GraphStageLogic, ValveSwitch) = {
> val logic = new ValveGraphStageLogic(shape, mode)
> (logic, logic.switch)
>   }
>
>   private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
> extends GraphStageLogic(shape){
> import shape._
>
> var bufferedElement = List.empty[A]
>
> val switch = new ValveSwitch {
>   override def open: Unit = {
> mode = ValveMode.Open
> println(s"pushing $bufferedElement, out is available ? 
> ${isAvailable(out)}")
> bufferedElement.foreach(push(out, _))
> bufferedElement = List.empty
>   }
>
>   override def close: Unit = {
> mode = ValveMode.Closed
>   }
> }
>
> setHandler(in, new InHandler {
>   override def onPush(): Unit = {
> val element = grab(in) //acquires the element that has been 
> received during an onPush
> println(s"${mode} on push called with $element")
> if (mode == ValveMode.Open) {
>   push(out, element) //push directly the element on the out port
> } else {
>   bufferedElement = bufferedElement :+ element
> }
>   }
> })
>
> setHandler(out, new OutHandler {
>   override def onPull(): Unit = {
> println("on pull called")
> pull(in) //request the next element on in port
>   }
> })
>   }
> }
>
> trait ValveMode
>
> object ValveMode {
>   case object Open extends ValveMode
>   case object Closed extends ValveMode
> }
>
> 
>
> My current unit test is failing. due to the fact when i open the valve, i 
> never received the previous message.
> It seems even if i push the element through ( valve.open ) the sink never 
> receive the element
>
> class ValveSpec extends FlatSpec {
>
>   implicit val system = ActorSystem()
>   implicit val materializer = ActorMaterializer()
>   implicit val executionContext = materializer.executionContext
>
>
>   "A closed valve" should "emit only 3 elements after it has been open" in 
> {
> val (valve, probe) = Source(1 to 3)
>   .viaMat(new Valve(ValveMode.Closed))(Keep.right)
>   .toMat(TestSink.probe[Int])(Keep.both)
>   .run()
>
> probe.request(1)
> probe.expectNoMsg()
>
> valve.open
> probe.expectNext(1)
>
> probe.request(2)
> probe.expectNext(2, 3)
>
> probe.expectComplete()
>   }
> }
>
>
> Here the gist 
> https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.