[akka-user] Re: FSM, how to make stateTimeout relative?

2017-11-01 Thread Christopher Hunt
Perhaps something like this in order to get your FiniteDuration:

Duration.ofMillis(
  ChronoUnit.MILLIS.between(
ZonedDateTime.parse("2007-12-03T10:15:30+01:00[Europe/Paris]"), 
ZonedDateTime.now
  )
)

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Debugging postStop cause within a custom stage

2017-11-01 Thread Christopher Hunt
Hi there,

I've got a situation where I observe a postStop being called within my 
custom GraphStage prior to when I'd expect. I'm sure Akka is doing the 
right thing but, what's the best way to debug what is causing the postStop? 
My understanding is that the input and output connections should all have 
closed in order for postStop to be invoked. I'm not expecting the output 
connection to have been closed though.

Thanks for any pointers.

Cheers,
-C

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] FSM, how to make stateTimeout relative?

2017-11-01 Thread Alexandr Sova
Hello.
Maybe someone also have a similar case or maybe it could be good start to 
extend akka functionality with pull request?..
The case is: I'm using PersistentFSM and I'd like my actor would stay at 
last/current state let's say till next Tuesday or till beginning of next 
hour etc. Luckily PresistentFSMs do not reset timers while staying at same 
state  so half of desired 
functionality is achieved. But looking at current definition of when I 
can't see how can I do different state timeouts each time the state changes 
to particular one. Looks with built-in toolkit I can set only fixed 
timeouts.
Now I'm handling this case using system.dispatcher.scheduleOnce that just 
emit Message/Event that I can react to and go to other state.
Am I missing some core functionality or am I the only one with such case 
and no one really needs it? If I'm not only one with such case will it be a 
good idea to send a pull-request to akka and akka-persistence to add 
variation of when method that uses { D => FiniteDuration } instead of just 
FiniteDuration to be able to provide different timeouts?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Spray->Akka-Http Migration - seeing high 99th percentile latencies post-migration

2017-11-01 Thread Gary Malouf
So the only way I was able to successfully identify the suspicious code was 
to route a percentage of my production traffic to a stubbed route that I 
incrementally added back pieces of our implementation into.  What I found 
was that we started getting spikes when the entity(as[CaseClassFromJson]) 
stubbed 
was added back in.  To figure out if it was the json parsing or 'POST' 
entity consumption itself, I replaced that class with a string - turns out 
we experience the latency spikes with that as well (on low traffic as noted 
earlier in this thread).  

I by no means have a deep understanding of streams, but it makes me wonder 
if the way I have our code consuming the entity is not correct.

On Monday, October 30, 2017 at 4:27:13 PM UTC-4, Gary Malouf wrote:
>
> Hi Roland - thank you for the tip.  We shrunk the thread pool size down to 
> 1, but were disheartened to still see the latency spikes.  Using Kamon's 
> tracing library (which we validated with various tests to ensure it's own 
> numbers are most likely correct), we could not find anything in our code 
> within the route that was causing the latency (it all appeared to be 
> classified to be that route but no code segments within it).  
>
> As mentioned earlier, running loads of 100-1000 requests/second completely 
> hides the issue (save for the max latency) as everything through 99th 
> percentiles is under a few milliseconds.
>
> On Tuesday, October 24, 2017 at 2:23:07 AM UTC-4, rkuhn wrote:
>>
>> You could try to decrease your thread pool size to 1 to exclude wakeup 
>> latencies when things (like CPU cores) have gone to sleep.
>>
>> Regards, Roland 
>>
>> Sent from my iPhone
>>
>> On 23. Oct 2017, at 22:49, Gary Malouf  wrote:
>>
>> Yes, it gets parsed using entity(as[]) with spray-json support.  Under a 
>> load test of say 1000 requests/second these latencies are not visible in 
>> the percentiles - they are easy to see because this web server is getting 
>> 10-20 requests/second currently.  Trying to brainstorm if a dispatcher 
>> needed to be tuned or something of that sort but have yet to see evidence 
>> supporting that.
>>
>> path("foos") { 
>> traceName("FooSelection") {
>> entity(as[ExternalPageRequest]) { pr => 
>> val spr = toSelectionPageRequest(pr) 
>> shouldTracePageId(spr.pageId).fold( 
>> Tracer.currentContext.withNewSegment(s"Page-${pr.pageId}", "PageTrace", "
>> kamon") { 
>> processPageRequestAndComplete(pr, spr) 
>> }, 
>> processPageRequestAndComplete(pr, spr) 
>> ) 
>> }
>> } 
>>
>> }
>>
>> On Mon, Oct 23, 2017 at 4:42 PM, Viktor Klang  
>> wrote:
>>
>>> And you consume the entityBytes I presume?
>>>
>>> On Mon, Oct 23, 2017 at 10:35 PM, Gary Malouf  
>>> wrote:
>>>
 It is from when I start the Kamon trace (just inside of my 
 path("myawesomepath") declaration until (theoretically) a 'complete' call 
 is made.  

 path("myawesomepath") {
   traceName("CoolStory") {
 ///do some stuff
  complete("This is great")
 } }

 For what it's worth, this route is a 'POST' call.

 On Mon, Oct 23, 2017 at 4:30 PM, Viktor Klang  
 wrote:

> No, I mean, is it from first-byte-received to last-byte-sent or what?
>
> On Mon, Oct 23, 2017 at 10:22 PM, Gary Malouf  
> wrote:
>
>> We are using percentiles computed via Kamon 0.6.8.  In a very low 
>> request rate environment like this, it takes roughly 1 super slow 
>> request/second to throw off the percentiles (which is what I think is 
>> happening).  
>>
>>
>>
>> On Mon, Oct 23, 2017 at 4:20 PM, Viktor Klang  
>> wrote:
>>
>>> What definition of latency are you using? (i.e. how is it derived)
>>>
>>> On Mon, Oct 23, 2017 at 10:11 PM, Gary Malouf  
>>> wrote:
>>>
 Hi Konrad,

 Our real issue is that we can not reproduce the results.  The web 
 server we are having latency issues with is under peak load of 10-15 
 requests/second - obviously not much to deal with.  

 When we use load tests (https://github.com/apigee/apib), it's easy 
 for us to throw a few thousand requests/second at it and get latencies 
 in 
 the ~ 3 ms range.  We use kamon to track internal metrics - what we 
 see is 
 that our 95th and 99th percentiles only look bad under the production 
 traffic but not under load tests.  

 I've since used kamon to print out the actual requests trying to 
 find any pattern in them to hint at what's wrong in my own code, but 
 they 
 seem to be completely random.  What we do know is that downgrading to 
 spray 
 gets us 99.9th percentile latencies under 2ms, so something related to 
 the 
 upgrade is allowing this.


[akka-user] Re: [akka-persistance cassandra plugin] eventsByTag is not distributed good enough across cassandra cluster

2017-11-01 Thread Christopher Batey
Hi Serhi

Yes you're right the current partition key only works if you have a 
smallish number (10ks) of events per tag per day.

The main messages table works like #2 for persistence ids but it all 
happens internally via the partition_nr column rather than the user having 
to do it.

We're already planning on moving away from using materialised views due to 
the instability of the feature as reported on the cassandra dev mailing 
list (see https://github.com/akka/akka-persistence-cassandra/issues/247).

For the new solution (manually managing a separate table partitioned by tag 
and time window) I had planned to do what you suggest in #3, making it 
configurable to day, hour or minute for the partitioning. The downside of 
making it minute for everyone is
when running an eventByTag query if you have a low number of events it 
would need to query many, possibly empty, partitions. Not using a 
materialised view will also allow is to batch writes to the tag table 
meaning we should be able to support any number of tags.

I should have something for you to try out next week and it would be great 
to get your feedback.

On Wednesday, 1 November 2017 17:17:43 UTC, Serhii Nesteruk wrote:
>
> Hello
>
> eventsByTag in the CassandraReadJournal uses a materialized view to read 
> the events. Currently materialized view is created by 
>
> CREATE MATERIALIZED VIEW IF NOT EXISTS $eventsByTagViewName$tagId AS
>SELECT tag$tagId, timebucket, timestamp, persistence_id, partition_nr, 
> sequence_nr, writer_uuid, ser_id, ser_manifest, event_manifest, event, message
>FROM $tableName
>WHERE persistence_id IS NOT NULL AND partition_nr IS NOT NULL AND 
> sequence_nr IS NOT NULL
>  AND tag$tagId IS NOT NULL AND timestamp IS NOT NULL AND timebucket IS 
> NOT NULL
>PRIMARY KEY ((tag$tagId, timebucket), timestamp, persistence_id, 
> partition_nr, sequence_nr)
>WITH CLUSTERING ORDER BY (timestamp ASC)
> """
>
>
> Partition key is (tag$tagId, timebucket) where timebucket has the 
> following format: DateTimeFormatter.ofPattern("MMdd")
> I've got a huge amount of events with the same tag. As a result all events 
> are stored on single cassandra node for one day, since all nodes 
> participated in writing events, this "materialized view" node slows down 
> the whole system.
>
> Possible workarounds:
> 1.Use set of tagId instead of one, to calculate tagId simple hash function 
> can be used: hash(event) % m. But in this case it may slow down query for 
> read part, as it should find every event across all nodes in the cluster. 
> 2. Also I thought about counter-based solution, change tag Id every 10k 
> events
> 3. contribute to cassandra plugin to make timebucket configurable and add 
> minutes to the pattern.
>
> Is it make sense? I've got some doubts :) . Because, anyway, one of the 
> nodes will "suffer" from materialized view and slow down the whole system.
>
> I'll be glad to hear any thoughts about it
>
> Thanks,
>   Serhii
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-persistance cassandra plugin] eventsByTag is not distributed good enough across cassandra cluster

2017-11-01 Thread Serhii Nesteruk
Hello

eventsByTag in the CassandraReadJournal uses a materialized view to read 
the events. Currently materialized view is created by 

CREATE MATERIALIZED VIEW IF NOT EXISTS $eventsByTagViewName$tagId AS
   SELECT tag$tagId, timebucket, timestamp, persistence_id, partition_nr, 
sequence_nr, writer_uuid, ser_id, ser_manifest, event_manifest, event, message
   FROM $tableName
   WHERE persistence_id IS NOT NULL AND partition_nr IS NOT NULL AND 
sequence_nr IS NOT NULL
 AND tag$tagId IS NOT NULL AND timestamp IS NOT NULL AND timebucket IS NOT 
NULL
   PRIMARY KEY ((tag$tagId, timebucket), timestamp, persistence_id, 
partition_nr, sequence_nr)
   WITH CLUSTERING ORDER BY (timestamp ASC)
"""


Partition key is (tag$tagId, timebucket) where timebucket has the following 
format: DateTimeFormatter.ofPattern("MMdd")
I've got a huge amount of events with the same tag. As a result all events 
are stored on single cassandra node for one day, since all nodes 
participated in writing events, this "materialized view" node slows down 
the whole system.

Possible workarounds:
1.Use set of tagId instead of one, to calculate tagId simple hash function 
can be used: hash(event) % m. But in this case it may slow down query for 
read part, as it should find every event across all nodes in the cluster. 
2. Also I thought about counter-based solution, change tag Id every 10k 
events
3. contribute to cassandra plugin to make timebucket configurable and add 
minutes to the pattern.

Is it make sense? I've got some doubts :) . Because, anyway, one of the 
nodes will "suffer" from materialized view and slow down the whole system.

I'll be glad to hear any thoughts about it

Thanks,
  Serhii

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka remoting messages get lost after sometime in Docker Swarm

2017-11-01 Thread Akash Mishra
Hi all,

I have the task of maintaining a year old akka app using remoting. The app 
consists of two services (lets say frontend and backend) communicating 
using remoting with protobuf serialization.

Both the services are currently running as a service in docker swarm. When 
there is only 1 replica running of both, everything works perfectly fine.

But as soon as you scale up or increase the number of replicas, I see a lot 
of message loss. Messages sent by the frontend service to the backend 
service are not seen in the logs of backend. There are no errors in the 
logs.
After a few hours, no message reaches the backend service. And I have to 
force restart both the services to have some messages flowing across them.

Here is some information I think would be useful. Let me know if you guys 
need to something else.

Scala version : 2.11.8
Akka version : 2.4.10
Protobuf version : 2

Remoting config for frontend service :
  remote {
enabled-transports = ["akka.remote.netty.tcp"]

netty.tcp {
  hostname = frontend# external (logical) hostname
  port = 40 # external (logical) port

  //bind-hostname = ${?CLUSTER_IP}  # internal (bind) hostname
  bind-port = 0
   }
log-sent-messages = on
log-received-messages = off
  }

Remoting config for backend service :
  remote {
enabled-transports = ["akka.remote.netty.tcp"]

netty.tcp {
  hostname = backend  # external (logical) hostname
  port = 40  # external (logical) port

  bind-hostname = "0.0.0.0"  # internal (bind) hostname
  bind-port = 40
}

log-sent-messages = on
log-received-messages = on

  }

I thought there is some problem with the config but then no messages should 
have gotten through. Number of messages getting across consistently 
deteriorates as time passes.
The problem can be with docker swarm too but I have got many apps 
communicating over tcp and I see no message loss there.

If you guys can help me resolve this problem, it would be great.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Splitting a stream to be consumed by dynamic sinks

2017-11-01 Thread Akka Team
My thought was that maybe you could achieve what you want by composing it
with existing stages, perhaps zipWithIndex+map index to key/ or use
stateFulMapConcat to decide when to change the key, and then groupBy into a
lazy sink. Can't say I'm sure that will solve your problem, but may be
worth exploring.

--
Johan
Akka Team

On Tue, Oct 31, 2017 at 9:49 PM, Jason Steenstra-Pickens <
thestein...@gmail.com> wrote:

> Hi Johan,
>
> As far as I can tell this only creates a single Sink based on the first
> element. I need a dynamic number of Sinks. In the past I have used a custom
> version of this which creates a new Source for every element (
> OneToOneOnDemandSink ).
> Neither of these solve the case above though.
>
> I could create a variation of the OneToOneOnDemandSink called
> ManyToOneOnDemandSink that takes a predicate to determine when to create
> the next Sink however this is still too specialised and we loose the
> composability. For example I would need to create three variations to cover
> the built-in SubFlow API (groupBy, splitAfter, splitWhen) which is doable
> but it is pushing all the logic into the Sink and it doesn't cover any
> custom SubFlows.
>
>
> Cheers,
> Jason
>
> On Tuesday, 31 October 2017 22:33:06 UTC+13, Akka Team wrote:
>>
>> Check if sink.lazySink.lazyInit() doesn't do what you want.
>>
>> --
>> Johan
>> Akka Team
>>
>> On Tue, Oct 31, 2017 at 12:59 AM, Jason Steenstra-Pickens <
>> thest...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I seem to be encountering a reoccurring problem when using Akka Streams
>>> and haven't found the right client API to use to solve it.
>>>
>>> The problem usually translates into:
>>>
>>>- I have some possibly infinite Source
>>>- I want to split it into multiple inner Sources based on some
>>>condition such as delimiter, count, or whatever
>>>- I then want to create a Sink for each inner Source dynamically and
>>>run each inner Flow
>>>- I want the backpressure, errors, completion, cancellation and
>>>stuff like that to be shared between the outer Flow and the inner Flow
>>>
>>> There are a few things come close but all seem to be for a slightly
>>> different use case, such as:
>>>
>>>- splitAt / splitWhen
>>>- lazy / lazyInit
>>>- the various hubs
>>>
>>> Here is a concrete example:
>>>
>>>1. Reading a file in 8KB parts
>>>2. Splitting the first 625 parts into a separate stream as a "chunk"
>>>3. Create a HTTP source that has a URL containing the chunk number
>>>4. Send the 625 parts to that source
>>>5. Take the next chunk from step 2
>>>
>>> An attempt using a SubFlow looks like:
>>> val chunkSize = fileResponse.chunkSize
>>> val partsPerChunk = chunkSize / partSize
>>> val counts = Source {
>>>   for {
>>> chunk <- 1 to Int.MaxValue
>>> part <- 1 to partsPerChunk
>>>   } yield (chunk, part)
>>> }
>>> val source = FileIO.fromPath(filePath, partSize)
>>>   .zip(counts)
>>>   .splitAfter({ next =>
>>> val (_, (_, part)) = next
>>> part == partsPerChunk
>>>   })
>>>
>>> This is quite nice but then there doesn't seem to be a way of getting to
>>> the inner Flow even if I were to create a custom Sink.
>>>
>>> It would be really awesome if SubFlow had a function like:
>>>   def to[M](f: Out => Graph[SinkShape[Out], M]): C
>>> (although probably without the materialised value since there would be
>>> multiple).
>>>
>>> Is there something obvious that I am missing?
>>>
>>>
>>> Cheers,
>>> Jason
>>>
>>> --
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: