Re: [akka-user] Splitting a stream to be consumed by dynamic sinks

2017-10-31 Thread Jason Steenstra-Pickens
Hi Johan,

As far as I can tell this only creates a single Sink based on the first 
element. I need a dynamic number of Sinks. In the past I have used a custom 
version of this which creates a new Source for every element (
OneToOneOnDemandSink ). 
Neither of these solve the case above though.

I could create a variation of the OneToOneOnDemandSink called 
ManyToOneOnDemandSink that takes a predicate to determine when to create 
the next Sink however this is still too specialised and we loose the 
composability. For example I would need to create three variations to cover 
the built-in SubFlow API (groupBy, splitAfter, splitWhen) which is doable 
but it is pushing all the logic into the Sink and it doesn't cover any 
custom SubFlows.


Cheers,
Jason

On Tuesday, 31 October 2017 22:33:06 UTC+13, Akka Team wrote:
>
> Check if sink.lazySink.lazyInit() doesn't do what you want.
>
> --
> Johan
> Akka Team
>
> On Tue, Oct 31, 2017 at 12:59 AM, Jason Steenstra-Pickens <
> thest...@gmail.com > wrote:
>
>> Hi,
>>
>> I seem to be encountering a reoccurring problem when using Akka Streams 
>> and haven't found the right client API to use to solve it.
>>
>> The problem usually translates into:
>>
>>- I have some possibly infinite Source
>>- I want to split it into multiple inner Sources based on some 
>>condition such as delimiter, count, or whatever
>>- I then want to create a Sink for each inner Source dynamically and 
>>run each inner Flow
>>- I want the backpressure, errors, completion, cancellation and stuff 
>>like that to be shared between the outer Flow and the inner Flow
>>
>> There are a few things come close but all seem to be for a slightly 
>> different use case, such as:
>>
>>- splitAt / splitWhen
>>- lazy / lazyInit
>>- the various hubs
>>
>> Here is a concrete example:
>>
>>1. Reading a file in 8KB parts
>>2. Splitting the first 625 parts into a separate stream as a "chunk"
>>3. Create a HTTP source that has a URL containing the chunk number
>>4. Send the 625 parts to that source
>>5. Take the next chunk from step 2
>>
>> An attempt using a SubFlow looks like:
>> val chunkSize = fileResponse.chunkSize
>> val partsPerChunk = chunkSize / partSize
>> val counts = Source {
>>   for {
>> chunk <- 1 to Int.MaxValue
>> part <- 1 to partsPerChunk
>>   } yield (chunk, part)
>> }
>> val source = FileIO.fromPath(filePath, partSize)
>>   .zip(counts)
>>   .splitAfter({ next =>
>> val (_, (_, part)) = next
>> part == partsPerChunk
>>   })
>>
>> This is quite nice but then there doesn't seem to be a way of getting to 
>> the inner Flow even if I were to create a custom Sink.
>>
>> It would be really awesome if SubFlow had a function like:
>>   def to[M](f: Out => Graph[SinkShape[Out], M]): C
>> (although probably without the materialised value since there would be 
>> multiple).
>>
>> Is there something obvious that I am missing?
>>
>>
>> Cheers,
>> Jason
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Persistance NullPointerException during actor start up

2017-10-31 Thread prashanth . ayyavu
Hi Akka gurus,

   1. We use Akka in a production setup
   2. We use Cassandra as a back up for akka-persistence.
   3. One of the PersistentActors throwed NullPointerException during 
   startup. 

*The Akka libraries used :*

"com.typesafe.akka" %% "akka-actor" % "2.4.17",
"com.typesafe.akka" %% "akka-slf4j" % "2.4.17",
"com.typesafe.akka" %% "akka-remote" % "2.4.17",
"com.typesafe.akka" %% "akka-cluster" % "2.4.17",
"com.typesafe.akka" %% "akka-cluster-sharding" % "2.4.17",
"com.typesafe.akka" %% "akka-persistence" % v,
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.23",
"com.typesafe.akka" %% "akka-http-core" % "10.0.1",
"com.typesafe.akka" %% "akka-stream" % "2.4.17",
"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.17"


*The receive recover method of my Actor is as follows.*



override def receiveRecover: Receive = {
 
 case SnapshotOffer(_, snapshot: DeviceActorReset) =>
 isProcessing = false
 task = null
 context.become(receiveIdleCommand())
 
 case TaskParsed(parsedTask) =>
 isProcessing = true
 task = parsedTask
 context.become(receiveValidateCommand())

 case TaskValidated => context.become(receiveProcessCommand)

 case RecoveryCompleted => {
 logger.debug(s"Recovery completed successfully for device actor : 
${self.path.name}")
 self ! Continue
 }
}



When one of the PersistentActors is started, we got 
java.lang.NullPointerException


*The logs are as follows*


DEBUG [com.sam.sami.argus.tasks.PersistentDeviceActor] 
(argus-akka.actor.default-dispatcher-5) 5643266d56c44415bc65ab7301ef938a 
Starting a deviceActor
ERROR [akka.actor.OneForOneStrategy] (argus-akka.actor.default-dispatcher-16) 
null
-java.lang.NullPointerException: null
:   at 
com.sam.sami.argus.tasks.PersistentDeviceActor$$anonfun$receiveProcessCommand$1.applyOrElse(PersistentDeviceActor.scala:183)
-   at akka.actor.Actor$class.aroundReceive(Actor.scala:497)
-   at 
com.sam.sami.argus.tasks.PersistentDeviceActor.akka$persistence$Eventsourced$$super$aroundReceive(PersistentDeviceActor.scala:29)
-   at 
akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:664)
-   at 
akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:183)
-   at 
com.sam.sami.argus.tasks.PersistentDeviceActor.aroundReceive(PersistentDeviceActor.scala:29)
-   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
-   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
-   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
-   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
-   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
-   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
-   at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
-   at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
-   at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
DEBUG [com.sam.sami.argus.tasks.PersistentDeviceActor] 
(argus-akka.actor.default-dispatcher-16) Snapshot recovery : 
DeviceActorReset(5643266d56c44415bc65ab7301ef938a)
DEBUG [com.sam.sami.argus.tasks.PersistentDeviceActor] 
(argus-akka.actor.default-dispatcher-5) Recovery completed successfully for 
device actor : 5643266d56c44415bc65ab7301ef938a


*Actor behavior or States*

We have the following different behavior aka states for this. (I want to 
mention this because the state names are occurring in the above logs)


   - receiveIdleCommand
   - receiveValidateCommand
   - receiveProcessCommand
   
5643266d56c44415bc65ab7301ef938a is just an ID we use to create actor names.


I am trying to understand what caused this.



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Slow subscribers never get invoked after sometime

2017-10-31 Thread Patrik Nordwall
If these actors blocking while waiting for postgres you might see a
starvation issue. All threads in the dispatcher are blocked. You should be
able to confirm this with thread dumps, a profiler, or Lightbend’s Thread
Starvation Detector.

/Patrik
mån 30 okt. 2017 kl. 06:56 skrev :

> Hi Team,
>
> I recently noticed this behavior in Akka event-bus and there are different
> subscribers listening to events and one subscriber writes to Postgres. This
> subscriber retry if the write fails for 10 times with exponential back-off
> with max wait of 10 seconds.
>
> Recently we had a postgres outage so I see large number of retry attempts
> and lots of slow subscribers but after sometime I see that subscriber
> wasn't invoked at all. But other subscribers for the same event
> (non-postgres) got invoked
> so there is not issue with publishing but something is going on with the
> dispatching messages to subscribers.
>
> I can simulate this by adding a timeout in Postgres Subscriber onMessage
> method and publish bunch of messages to event-bus, all the other
> subscribers get invoke but Postgres subscriber.
>
> Do you know an issue like this or is there a way to avoid this behavior ?
>
> Regards
> Lahiru
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Mixing shard regions and proxy to shards regions in the same actor system

2017-10-31 Thread Patrik Nordwall
You're welcome

On Tue, Oct 31, 2017 at 11:55 AM, Eduardo Fernandes 
wrote:

> The fixed code.
>
>
> El martes, 31 de octubre de 2017, 11:54:49 (UTC+1), Eduardo Fernandes
> escribió:
>>
>> Fine!! It works like a charm!!
>>
>> Many thanks Patrik for your time on this!!
>>
>> I'm attaching the code fixed with your comments.
>>
>> Many thanks for your time on this. It was not clear for me that the role
>> was mandatory. I supposed that a non-role means any. My fault.
>>
>> Happy Halloween and thanks again!
>>
>>
>> El martes, 31 de octubre de 2017, 10:53:57 (UTC+1), Patrik Nordwall
>> escribió:
>>>
>>> In that example you are not using roles at all, which I think is
>>> necessary.
>>> You need 3 roles:
>>>
>>>- backend-0 where you start region-0, and start proxy for region-1
>>>with role backend-1
>>>- backend-1 where you start region-1, and start proxy for region-0
>>>with role backend-0
>>>- frontend where you start proxy for region-0 with role backend-0,
>>>and proxy for region-1 with role backend-1
>>>
>>> /Patrik
>>>
>>> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend  -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Mixing shard regions and proxy to shards regions in the same actor system

2017-10-31 Thread Eduardo Fernandes
Fine!! It works like a charm!!

Many thanks Patrik for your time on this!!

I'm attaching the code fixed with your comments.

Many thanks for your time on this. It was not clear for me that the role 
was mandatory. I supposed that a non-role means any. My fault.

Happy Halloween and thanks again!


El martes, 31 de octubre de 2017, 10:53:57 (UTC+1), Patrik Nordwall 
escribió:
>
> In that example you are not using roles at all, which I think is necessary.
> You need 3 roles: 
>
>- backend-0 where you start region-0, and start proxy for region-1 
>with role backend-1
>- backend-1 where you start region-1, and start proxy for region-0 
>with role backend-0
>- frontend where you start proxy for region-0 with role backend-0, and 
>proxy for region-1 with role backend-1
>
> /Patrik
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Mixing shard regions and proxy to shards regions in the same actor system

2017-10-31 Thread Patrik Nordwall
In that example you are not using roles at all, which I think is necessary.
You need 3 roles:

   - backend-0 where you start region-0, and start proxy for region-1 with
   role backend-1
   - backend-1 where you start region-1, and start proxy for region-0 with
   role backend-0
   - frontend where you start proxy for region-0 with role backend-0, and
   proxy for region-1 with role backend-1

/Patrik


On Tue, Oct 31, 2017 at 3:52 AM, Eduardo Fernandes  wrote:

> Hi all again.
>
> After playing a bit with the sample code I saw other effect I still don't
> understand. I just modify the original code to enable creation of only two
> proxies instead of one materialized and proxy shard regions. When I do it
> the original materialized regions that worked before stop working.
>
> Please test it changing the boolean createProxyOnly in the beginning of
> the class to 'true'.
>
> Again many thanks for your time on this. I'm pretty sure that it is a
> simple mistake in my 15 lines sample code but I can't catch it.
>
> Best regards.
>
> /Eduardo
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Akka Tech Lead
Lightbend  -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Splitting a stream to be consumed by dynamic sinks

2017-10-31 Thread Akka Team
Check if sink.lazySink.lazyInit() doesn't do what you want.

--
Johan
Akka Team

On Tue, Oct 31, 2017 at 12:59 AM, Jason Steenstra-Pickens <
thestein...@gmail.com> wrote:

> Hi,
>
> I seem to be encountering a reoccurring problem when using Akka Streams
> and haven't found the right client API to use to solve it.
>
> The problem usually translates into:
>
>- I have some possibly infinite Source
>- I want to split it into multiple inner Sources based on some
>condition such as delimiter, count, or whatever
>- I then want to create a Sink for each inner Source dynamically and
>run each inner Flow
>- I want the backpressure, errors, completion, cancellation and stuff
>like that to be shared between the outer Flow and the inner Flow
>
> There are a few things come close but all seem to be for a slightly
> different use case, such as:
>
>- splitAt / splitWhen
>- lazy / lazyInit
>- the various hubs
>
> Here is a concrete example:
>
>1. Reading a file in 8KB parts
>2. Splitting the first 625 parts into a separate stream as a "chunk"
>3. Create a HTTP source that has a URL containing the chunk number
>4. Send the 625 parts to that source
>5. Take the next chunk from step 2
>
> An attempt using a SubFlow looks like:
> val chunkSize = fileResponse.chunkSize
> val partsPerChunk = chunkSize / partSize
> val counts = Source {
>   for {
> chunk <- 1 to Int.MaxValue
> part <- 1 to partsPerChunk
>   } yield (chunk, part)
> }
> val source = FileIO.fromPath(filePath, partSize)
>   .zip(counts)
>   .splitAfter({ next =>
> val (_, (_, part)) = next
> part == partsPerChunk
>   })
>
> This is quite nice but then there doesn't seem to be a way of getting to
> the inner Flow even if I were to create a custom Sink.
>
> It would be really awesome if SubFlow had a function like:
>   def to[M](f: Out => Graph[SinkShape[Out], M]): C
> (although probably without the materialised value since there would be
> multiple).
>
> Is there something obvious that I am missing?
>
>
> Cheers,
> Jason
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.