Re: [akka-user][deprecated] Akka Streams Resuming Materializer does not log exceptions

2018-05-17 Thread dollyg
Hi Konrad,

Thanks for your response. I have started a thread 

 
in discuss.lightbend as suggested by you. I've posted a follow-up question 
on that. Looking forward for your reply.

Thanks,
Dolly

On Thursday, May 17, 2018 at 8:14:31 AM UTC+5:30, Konrad Malawski wrote:
>
> Use the .log() operator.
>
> Same as exceptions thrown in a Future do not automatically get logged.
>
> There has been discussion about enabling this always, however you’d then 
> be swamped with errors which are recovered and actually “fine and nothing 
> to worry about”.
>
> If you want logging, put a log() there.
>
> PS: New discussion forum: https://discuss.akka.io/ replacing akka-user 
> google-group soon. :-)
>
> -- 
> Cheers,
> Konrad 'ktoso ' Malawski
> Akka  @ Lightbend 
>
> On May 17, 2018 at 1:55:24, dol...@thoughtworks.com  (
> dol...@thoughtworks.com ) wrote:
>
> Hello all, 
>
> When I provide a resuming materializer to the stream, the exceptions that 
> occurred in the stream do not get logged. Is there any way to log these 
> exceptions ?
>
> import akka.actor.ActorSystem
> import akka.stream.scaladsl.Source
> import akka.stream.{ActorMaterializer, ActorMaterializerSettings, 
> Materializer, Supervision}
>
> import scala.concurrent.duration.DurationInt
>
> object Demo extends App {
>
>   private implicit val actorSystem: ActorSystem = ActorSystem()
>
>   lazy val settings: ActorMaterializerSettings =
> 
> ActorMaterializerSettings(actorSystem).withSupervisionStrategy(Supervision.getResumingDecider)
>
>   implicit lazy val resumingMat: Materializer = ActorMaterializer(settings)
>
>   var counter = 0
>   def eventGenerator(): Int = {
> counter += 1
> if (counter > 5) throw new RuntimeException("Could not create an event")
> else 1
>   }
>
>   Source.tick(0.millis, 10.millis, ()).map(_ ⇒ 
> eventGenerator()).runForeach(println)
> }
>
> --
>
> *
> ** New discussion forum: https://discuss.akka.io/ replacing akka-user 
> google-group soon.
> ** This group will soon be put into read-only mode, and replaced by 
> discuss.akka.io
> ** More details: 
> https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
>
> *
> >>
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
*
** New discussion forum: https://discuss.akka.io/ replacing akka-user 
google-group soon.
** This group will soon be put into read-only mode, and replaced by 
discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*
>> 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user][deprecated] Akka Streams Resuming Materializer does not log exceptions

2018-05-16 Thread dollyg
Hello all,

When I provide a resuming materializer to the stream, the exceptions that 
occurred in the stream do not get logged. Is there any way to log these 
exceptions ?

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer, 
Supervision}

import scala.concurrent.duration.DurationInt

object Demo extends App {

  private implicit val actorSystem: ActorSystem = ActorSystem()

  lazy val settings: ActorMaterializerSettings =

ActorMaterializerSettings(actorSystem).withSupervisionStrategy(Supervision.getResumingDecider)

  implicit lazy val resumingMat: Materializer = ActorMaterializer(settings)

  var counter = 0
  def eventGenerator(): Int = {
counter += 1
if (counter > 5) throw new RuntimeException("Could not create an event")
else 1
  }

  Source.tick(0.millis, 10.millis, ()).map(_ ⇒ 
eventGenerator()).runForeach(println)
}

-- 
*
** New discussion forum: https://discuss.akka.io/ replacing akka-user 
google-group soon.
** This group will soon be put into read-only mode, and replaced by 
discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*
>> 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Typed BehaviorTestKit with TimerScheduler

2018-03-08 Thread dollyg
Thanks, Patrick.

For now, we have used a workaround by mocking a TimerScheduler instance as 
below:

val timerScheduler = mock[TimerScheduler[MyMessage]]

doNothing()
  .when(timerScheduler)
  .startSingleTimer(
ArgumentMatchers.eq(MyBehavior.TimerKey),
ArgumentMatchers.any[MyMessage],
ArgumentMatchers.any[FiniteDuration]
  )

doNothing().when(timerScheduler).cancel(eq(MyBehavior.TimerKey))


and creating the behavior like below:

val behaviorTestKit = BehaviorTestKit(createBehavior(timerScheduler))

private def createBehavior(timerScheduler: TimerScheduler[MyMessage]): 
Behavior[MyMessage] = {
  Behaviors
.mutable[MyMessage](
ctx =>
  new MyBehavior(
ctx,
timerScheduler,
someInfo
  )
  )
}


We have also raised an issue here 
.

Thanks and Regards,
Dolly


On Thursday, March 8, 2018 at 3:13:09 PM UTC+5:30, Patrik Nordwall wrote:
>
> I think it will be difficult to support the ordinary scheduler in 
> BehaviorTestKit but perhaps we could use the manual scheduler: 
> https://doc.akka.io/docs/akka/current/typed/testing.html#controlling-the-scheduler
>
> Please create an issue at https://github.com/akka/akka/issues/new
>
> Thanks,
> Patrik
>
> On Thu, Mar 8, 2018 at 7:23 AM,  
> wrote:
>
>> Hi,
>>
>> We are trying to create a BehaviorTestKit using a behavior which spawns 
>> a timer within.
>>
>> val behaviorTestKit = BehaviorTestKit(createBehavior)
>>
>>
>> private def createBehavior(): Behavior[MyMessage] = {
>>   Behaviors
>> .withTimers[MyMessage](
>>   timerScheduler ⇒
>> Behaviors
>>   .mutable[MyMessage](
>> ctx =>
>>   new MyBehavior(
>> ctx,
>> timerScheduler,
>> someInfo
>> )
>> )
>> )
>> }
>>
>>
>> The above code results in error below:
>>
>> An exception or error caused a run to abort: no scheduler 
>> java.lang.UnsupportedOperationException: no scheduler
>>
>> *spawn *provided in ActorTestKit works fine in spawning an actor for 
>> this behavior.
>>
>> However, we need to test the underlying effects of the given behavior and 
>> hence need to use BehaviorTestKit.
>> Could someone please provide a solution to this problem?
>>
>> Thanks and Regards,
>> Dolly
>> ThoughtWorks
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Akka Tech Lead
> Lightbend  -  Reactive apps on the JVM
> Twitter: @patriknw
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Typed BehaviorTestKit with TimerScheduler

2018-03-07 Thread dollyg
Hi,

We are trying to create a BehaviorTestKit using a behavior which spawns a 
timer within.

val behaviorTestKit = BehaviorTestKit(createBehavior)


private def createBehavior(): Behavior[MyMessage] = {
  Behaviors
.withTimers[MyMessage](
  timerScheduler ⇒
Behaviors
  .mutable[MyMessage](
ctx =>
  new MyBehavior(
ctx,
timerScheduler,
someInfo
)
)
)
}


The above code results in error below:

An exception or error caused a run to abort: no scheduler 
java.lang.UnsupportedOperationException: no scheduler

*spawn *provided in ActorTestKit works fine in spawning an actor for this 
behavior.

However, we need to test the underlying effects of the given behavior and 
hence need to use BehaviorTestKit.
Could someone please provide a solution to this problem?

Thanks and Regards,
Dolly
ThoughtWorks

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Akka Streams] Difference between KillSwitch and Materializer.shutdown()

2018-03-01 Thread dollyg
This was helpful. Thank you!

On Monday, February 19, 2018 at 6:04:45 PM UTC+5:30, Konrad Malawski wrote:
>
> First warning sign: Why would you have one stream per materializer?
>
> shutting down the materializer while things are running is very brutal. 
> It’s like forcefully pulling the carpet from under someone’s feet, 
> and the laughing as they spill their coffee upon themselves — don’t do 
> this as the go-to solution. 
> It’s better than leaving resource leaks, but don’t do this as the “clean” 
> shutdown.
>
>
> KillSwitches actually signal termination properly using streams signals — 
> cancelation and completion/error.
> Use them when you want to externally stop things.
>
> -- 
> Cheers,
> Konrad 'ktoso ' Malawski
> Akka  @ Lightbend 
>
> On February 19, 2018 at 21:31:58, dol...@thoughtworks.com  (
> dol...@thoughtworks.com ) wrote:
>
> I came across 2 ways to terminate a stream
>
>1. KillSwitch 
>2. Materializer.shutdown() 
>
> I see one difference which is 
>
>- Materializer.shutdown() is used to kill all streams materialized by 
>that materializer whereas KillSwitch can be used to terminate one 
>particular stream.
>
>
> In a scenario where I have one materializer per stream, is there any 
> difference between KillSwitch and Materializer.shutdow()? Which one should 
> be used? And When?
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka Streams] Feedback on custom throttle stage

2018-03-01 Thread dollyg


Hi all, 

We are trying to create a stage which caters to 2 requirements - 

1) If the producer is faster than the consumer, then consumer should get the 
latest element always (dropping the intermediate elements). Similar to conflate 

 api. This was discussed here 
.

2) If the producer is slower than the consumer, then consumer (who is pulling 
on a faster frequency) should get the latest element on the stream on each 
pull. (which means elements will be duplicated). Similar to expand 

 api.

Since the conflate and expand stages both buffer elements which increases the 
latency, we are creating our own stage.
 
Looking for feedback on the code below.

import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

import scala.concurrent.duration.FiniteDuration

class CustomThrottleStage[A](delay: FiniteDuration) extends 
GraphStage[FlowShape[A, A]] {
  final val in= Inlet.create[A]("Throttle.in")
  final val out   = Outlet.create[A]("Throttle.out")
  final val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new TimerGraphStageLogic(shape) {
private var isPulled = false
private var maybeElem: Option[A] = None

override def preStart(): Unit = {
  schedulePeriodically(None, delay)
  pull(in)
}

setHandler(
  in,
  new InHandler {
override def onPush(): Unit = {
  //Whenever upstream pushes elements, store it and push it only on the 
timer.
  maybeElem = Some(grab(in))
  pull(in) //drop elements - required when the producer is faster
}
  }
)

setHandler(
  out,
  new OutHandler {
override def onPull(): Unit = {
  isPulled = true
}
  }
)

override def onTimer(key: Any): Unit = {
  *//**on timer, push only if there is a demand from downstream*
  if (isPulled) {
maybeElem.foreach { x =>
  isPulled = false
  push(out, x)
}
  }
}
  }
}



Regards,

Dolly




-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka Streams] Difference between KillSwitch and Materializer.shutdown()

2018-02-19 Thread dollyg
I came across 2 ways to terminate a stream

   1. KillSwitch
   2. Materializer.shutdown()

I see one difference which is 

   - Materializer.shutdown() is used to kill all streams materialized by 
   that materializer whereas KillSwitch can be used to terminate one 
   particular stream.
   

In a scenario where I have one materializer per stream, is there any 
difference between KillSwitch and Materializer.shutdow()? Which one should 
be used? And When?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.