Re: [akka-user] Slow reactive-kafka consumer when running multiple streams

2018-02-12 Thread Richard Rodseth
Anyone else running more than one stream in a deployable? Surely the answer
must be yes. Would love to know what the problem might be.


On Wed, Dec 27, 2017 at 10:02 AM, Harshit Patel 
wrote:

> We have an application that uses reactive-kafka to consume messages from a
> kafka topic, do some validations and transformations on the messages, and
> then persist to two different datastores. We are using two different
> reactive kafka consumer streams for each of the datastores, using different
> consumer-group-ids. Each of the stream can be enabled/disabled via
> configuration in the application.
>
> When we run either one of the consumer streams, they seem to run fine and
> are able to consume messages and keep up with the topic reasonably well.
> But when both streams are run together, we see a pretty high latency (4-5
> minutes) and both consumers are unable to keep up with the messages from
> the topic. We are publishing about 15 messages/minute to the topic. We
> are making sure that both consumers are managing blocking where applicable,
> and are using separate dedicated dispatchers for the same.
>
> Any ideas what could be going on ?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Broadcast and separate backpressure

2017-12-19 Thread Richard Rodseth
Despite the excellent and timely blog post from Colin Breck

http://blog.colinbreck.com/maximizing-throughput-for-akka-streams/

we are having a devil of a time optimizing throughput in a stream which
does the following

1) consume messages containing a channel UUID from Kafka
The topic is partitioned based on channel UUID and we are using
committablePartitionedSource
2) lookup a legacy channel id from db
3) do a write (batched of course) to the same db
4) commit offsets

We've tried all sorts of things like using a Caffeine cache  in 2) and
peeking into the cache and then having a different flow for items with uuid
in cache or not. But because each partition (and hence the stream for each
partition) has a mix of channels, the slow channels (not in cache) slow
down the whole stream. We don't think groupBy is an option because of
ordering.

Our next idea is a custom stage that does a broadcast to two outputs, 1)
dedicated to populating the cache, and 2) for cache lookup and write. That
doesn't solve the merged backpressure problem so we thought we could add a
large buffer to 2.

Short of writing an entirely separate consumer that just populates the
cache, is there any other way to broadcast and somehow decouple the
backpressure of the two outputs from each other?

Any other ideas welcome too. Thanks.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Implementing mailing service using Akka

2017-11-10 Thread Richard Rodseth
You could potentially use the query side of Akka Persistence (eventsByTag)
to stream requests for email sends, feeding failures back into the journal
with the same tag, so they reenter the stream.

https://doc.akka.io/docs/akka/2.5/scala/persistence-query.html

On Fri, Nov 10, 2017 at 5:16 AM, Traven  wrote:

> Hello,
>
> I have a simple Play application that does some statistical analysis of
> data and alerts customers with alerts when something is wrong. Sometimes it
> happens, that sending alert fails (SMTP server down, network errors) etc.
>
> I was thinking of a way to make the email sending process more resilient
> by using some features of Akka Persistence (I'm quite new here), so the
> workflow I was considering:
>
> 1. Persistent actor receives request to send email alerts.
> 2. Alerts are persistence with akka persistence plugin.
> 3. Alerts are sent by the actor.
>
> However, the problem I see in this is that if Actor crashes and replays
> all messages I would send duplicates (or worse) of emails, so I would have
> to clear the messages myself, which kind of misses the point of using actor
> at all.
>
> Is Persistent Actor the right approach to solve these kind of problems?
> Should I solve this problem the `default` way by just using some external
> storage for all non-sent mails (e.g. Kafka queue)?
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] No shutDown on Materializer

2017-10-23 Thread Richard Rodseth
Here you go

https://github.com/akka/akka/issues/23837



On Mon, Oct 23, 2017 at 10:22 PM, Konrad “ktoso” Malawski <
konrad.malaw...@lightbend.com> wrote:

> Adding the shutdown method to the interface you mean?
> I guess it could be considered, please open a ticket.
> Technically it would be breaking to do so, but materializers are somewhat
> special “internal” one might say.
>
> --
> Cheers,
> Konrad 'ktoso <http://kto.so>' Malawski
> Akka <http://akka.io/> @ Lightbend <http://lightbend.com/>
>
> On October 24, 2017 at 14:20:41, Richard Rodseth (rrods...@gmail.com)
> wrote:
>
> Hi Konrad. I don't believe I used that phrase, but thanks for elaborating.
> I will digest what you wrote and discuss it.
> Any thoughts on having a lifecycle method on Materializer vs
> ActorMaterializer?
>
> On Mon, Oct 23, 2017 at 7:39 PM, Konrad “ktoso” Malawski <
> konrad.malaw...@lightbend.com> wrote:
>
>> Hi Richard,
>> Saying that “using Materializer” will cause leaks is somewhat of a weird
>> phrasing, since that sounds just like “using ActorSystem can cause leaks” —
>> well sure, just like “using objects can cause leaks” :-)
>>
>> Let’s step back and analyse where your colleague’s statement came from,
>> and perhaps how/why the misinterpretation happened.
>>
>> I see someone really enjoyed Colin’s awesome talk about streams and
>> actors there :-)
>> // Likely the best talk about those two and how they fit together I’ve
>> seen by the way.
>> // You may want to read his blog http://blog.colinbreck.co
>> m/integrating-akka-streams-and-akka-actors-part-ii/
>> In his talk he explains the lifecycle differences between creating an
>> ActorMaterializer from an ActorSystem,
>> vs. creating an ActorMaterializer from an ActorContext. Note that both
>> are possible, since they are both ActorRefFactories.
>>
>> The ActorMaterializer’s lifecycle is bound to the ActorRefFactory that it
>> was created from.
>>
>> In other words:
>> 1) ActorMaterializer created from ActorSystem
>>   - will terminate all its streams if the ActorSystem terminates
>> 2) ActorMaterializer created from ActorContext
>>   - will terminate all its streams if THAT actor terminates
>>
>> It’s not that one is a leak and the other one isn’t.
>> It’s that the 1st case is good for long lived streams, and such that
>> should out-live an Actor if it was started in one.
>> Say, this is good for persistence query, or akka http, or some shared
>> stream many other parts of your system rely on.
>> The 2nd case is a good choice if the stream and actor are inherently
>> related to one another - say, the stream relates to some “user”, and the
>> actor that started that stream is also “the user”, so if the actor dies we
>> should also kill the stream.
>>
>> Hope this clarifies things.
>>
>> I also submitted a documentation improvement about it right now, so
>> please see here:
>> https://github.com/akka/akka/pull/23836
>>
>> --
>> Cheers,
>> Konrad 'ktoso <http://kto.so>' Malawski
>> Akka <http://akka.io/> @ Lightbend <http://lightbend.com/>
>>
>> On October 24, 2017 at 8:04:10, Richard Rodseth (rrods...@gmail.com)
>> wrote:
>>
>> I just heard from someone who attended a talk at Reactive Summit that you
>> can leak resources if you are restarting a stream within a host actor, and
>> not shutting down the materializer.
>>
>> I've written most of the stream creation logic and messages using
>> Materializer, not ActorMaterializer, and I see that the shutdown method is
>> only on ActorMaterializer.
>>
>> Easy enough to change my types, but I was curious if this was a
>> considered decision.
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] No shutDown on Materializer

2017-10-23 Thread Richard Rodseth
Hi Konrad. I don't believe I used that phrase, but thanks for elaborating.
I will digest what you wrote and discuss it.
Any thoughts on having a lifecycle method on Materializer vs
ActorMaterializer?

On Mon, Oct 23, 2017 at 7:39 PM, Konrad “ktoso” Malawski <
konrad.malaw...@lightbend.com> wrote:

> Hi Richard,
> Saying that “using Materializer” will cause leaks is somewhat of a weird
> phrasing, since that sounds just like “using ActorSystem can cause leaks” —
> well sure, just like “using objects can cause leaks” :-)
>
> Let’s step back and analyse where your colleague’s statement came from,
> and perhaps how/why the misinterpretation happened.
>
> I see someone really enjoyed Colin’s awesome talk about streams and actors
> there :-)
> // Likely the best talk about those two and how they fit together I’ve
> seen by the way.
> // You may want to read his blog http://blog.colinbreck.
> com/integrating-akka-streams-and-akka-actors-part-ii/
> In his talk he explains the lifecycle differences between creating an
> ActorMaterializer from an ActorSystem,
> vs. creating an ActorMaterializer from an ActorContext. Note that both are
> possible, since they are both ActorRefFactories.
>
> The ActorMaterializer’s lifecycle is bound to the ActorRefFactory that it
> was created from.
>
> In other words:
> 1) ActorMaterializer created from ActorSystem
>   - will terminate all its streams if the ActorSystem terminates
> 2) ActorMaterializer created from ActorContext
>   - will terminate all its streams if THAT actor terminates
>
> It’s not that one is a leak and the other one isn’t.
> It’s that the 1st case is good for long lived streams, and such that
> should out-live an Actor if it was started in one.
> Say, this is good for persistence query, or akka http, or some shared
> stream many other parts of your system rely on.
> The 2nd case is a good choice if the stream and actor are inherently
> related to one another - say, the stream relates to some “user”, and the
> actor that started that stream is also “the user”, so if the actor dies we
> should also kill the stream.
>
> Hope this clarifies things.
>
> I also submitted a documentation improvement about it right now, so please
> see here:
> https://github.com/akka/akka/pull/23836
>
> --
> Cheers,
> Konrad 'ktoso <http://kto.so>' Malawski
> Akka <http://akka.io/> @ Lightbend <http://lightbend.com/>
>
> On October 24, 2017 at 8:04:10, Richard Rodseth (rrods...@gmail.com)
> wrote:
>
> I just heard from someone who attended a talk at Reactive Summit that you
> can leak resources if you are restarting a stream within a host actor, and
> not shutting down the materializer.
>
> I've written most of the stream creation logic and messages using
> Materializer, not ActorMaterializer, and I see that the shutdown method is
> only on ActorMaterializer.
>
> Easy enough to change my types, but I was curious if this was a considered
> decision.
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Handling blocking tasks

2017-10-16 Thread Richard Rodseth
Read this if you haven't
https://doc.akka.io/docs/akka/2.5/scala/dispatchers.html#blocking-needs-careful-management

You can start with a single Actor for JMS and one for file IO, each
configured with a custom dispatcher.
Or use Futures, with custom dispatchers provided as ExecutionContext.

On Mon, Oct 16, 2017 at 8:30 AM, Dries Demeurisse <
dries.demeuri...@gmail.com> wrote:

> Hi,
>
> We are doing a lot of blocking operations: file reading + synchronous JMS
> sending. (Currently not really possible to make use of the async
> alternatives)
> Events come in: for each event, 1-x files are read and parsed until an
> end-event comes in, meanwhile the results are sent to an activeMQ.
> There can be a multitude of these running simultaneous, currently each
> file is 1 actor.
>
> The problem that I've been trying to wrap my head around concerning
> JMS/doing blocking I/O is: (to be clear: just using JmsTemplate for sending
> to JMS)
> - should I go with #xxx actors (1 per file-actor) that send to JMS
> (created on a 'blocking-io-dispatcher')
> - or should I create some sort of pool of 4-x actors that will do the JMS
> work
> - or just 1 actor that only sends to JMS (what if it crashes? can imagine
> some supervision problems)
> - or shouldn't I have seperate actor(s) for JMS and just handle it in a
> future?
> - or work with the Akka Camel integration
> (Alpakka Streaming won't go as it only supports String messages at the
> moment and the legacy code is using JMSObject messages. And even there you
> have the same problem: how many producers do you want to have streaming to
> MQ? + I want to send synchronous via JMS at the moment, because I fear
> randomly losing messages)
>
>
> With the file reading it is about the same problem: it's an internal
> library that does old school java IO with BufferedReaders, so no async
> handling there too:
> - should I create "blocking" actors: just do the I/O and block the actor
> - or should I do the file I/O in futures and send outcome messages to
> myself (or the JMS actor)
>
> Finally the combination: can they both be on the same
> blocking-io-dispatcher or should JMS and File I/O each have their own
> blocking dispatcher? :-)
>
> I suspect Akka may not have been the ideal framework for this sort of
> task, but that was out of my hands (inherited). I just want to use it as
> best as I can.
>
> So, can you provide some advice or maybe things I overlooked or am I just
> overcomplicating things?
>
> Thanks!
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Kafka supervision hell

2017-10-12 Thread Richard Rodseth
Apologies for the alarmist subject, but we're having a very difficult time
getting supervision solid for a committablePartitionedSource.

Latest issue is that someone used the Kafka REST Proxy to post an invalid
message (we *are* using the schema registry). When running our consumer,
that resulted in

Caused by: org.apache.kafka.common.errors.SerializationException: Error
retrieving Avro schema for id 4

Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Schema not found; error code: 40403
This then manifested in the stream's onFailure handler as

Caused by: java.lang.Exception: Consumer actor terminated

at
akka.kafka.internal.SubSourceLogic$SubSourceStage$$anon$1$$anonfun$preStart$2.apply(SubSourceLogic.scala:150)

At that point there is no way to skip the element by resuming, and the
stream is stuck because it is stopped (and restarted by our host actor)
with no offset committed. The "Consumer actor terminated" exception has no
cause value.

I've tried attaching a Supervision.Decider to the partition source, the
intervals source and the merge thereof.

val mergedSource: Source[CommittableNormalizedInterval, Consumer.Control]
=

  sources.withAttributes(attributesForPartitionSource)

.flatMapMerge(breadth = consumerParallelism, a => a._2.
withAttributes(attributesForIntervalSource))

val result = mergedSource.withAttributes(attributesForIntervalSource)

  .via(partitionFlow)

  .toMat(Sink.ignore)(Keep.both)
But those deciders are not reached, so I appear to have no way to detect
the serialization problem and skip the element.

On Fri, Oct 6, 2017 at 10:30 AM, Richard Rodseth <rrods...@gmail.com> wrote:

> I'm using akka-kafka with committablePartitionedSource, meaning I have a
> source of (partition, innersource) tuples.
>
> I'm currently using the flatMapMerge recipe from
>
> https://doc.akka.io/docs/akka-stream-kafka/current/consumer.
> html#source-per-partition
>
> val done = Consumer.committablePartitionedSource(consumerSettings, 
> Subscriptions.topics("topic1"))
>   .flatMapMerge(maxPartitions, _._2)
>   .via(business)
>   .batch(max = 20, first => 
> CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, 
> elem) =>
> batch.updated(elem.committableOffset)
>   }
>   .mapAsync(3)(_.commitScaladsl())
>   .runWith(Sink.ignore)
>
> If I have a default supervision strategy which resumes, set at the
> materializer level, but want to stop the stream (which in my case restarts
> a host actor with backoff) at the source level, do I need to do a
> withAttributes(attributesForStoppingDecider) at both the outer source and
> inner source leves?
> i.e.
>
> val done = Consumer.committablePartitionedSource(consumerSettings, 
> Subscriptions.topics("topic1"))
>   .withAttributes(attributesForStoppingDecider).flatMapMerge(maxPartitions, 
> _._2.withAttributes(attributesForStoppingDecider))
> .via(business)
>
> or can I do it after the flatMapMerge? Question motivated by this open
> ticket which I'm not sure I fully understand yet.
>
> https://github.com/akka/akka/issues/23066
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] flatMapMerge and SupervisionStrategy

2017-10-06 Thread Richard Rodseth
I'm using akka-kafka with committablePartitionedSource, meaning I have a
source of (partition, innersource) tuples.

I'm currently using the flatMapMerge recipe from

https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition

val done = Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics("topic1"))
  .flatMapMerge(maxPartitions, _._2)
  .via(business)
  .batch(max = 20, first =>
CommittableOffsetBatch.empty.updated(first.committableOffset)) {
(batch, elem) =>
batch.updated(elem.committableOffset)
  }
  .mapAsync(3)(_.commitScaladsl())
  .runWith(Sink.ignore)

If I have a default supervision strategy which resumes, set at the
materializer level, but want to stop the stream (which in my case restarts
a host actor with backoff) at the source level, do I need to do a
withAttributes(attributesForStoppingDecider) at both the outer source and
inner source leves?
i.e.

val done = Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics("topic1"))
  .withAttributes(attributesForStoppingDecider).flatMapMerge(maxPartitions,
_._2.withAttributes(attributesForStoppingDecider))
.via(business)

or can I do it after the flatMapMerge? Question motivated by this open
ticket which I'm not sure I fully understand yet.

https://github.com/akka/akka/issues/23066

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Detecting consumer readiness in Akka Kafka

2017-10-02 Thread Richard Rodseth
We have an integration test that uses Akka Kafka to publish messages to a
topic, that are then consumed by a library under test that also uses Akka
Kafka.

Because autoOffsetReset is "latest", we are trying to defer publishing the
test messages until the consumer is "ready".

The consumer stream is monitored by a host actor, so we have been able to
send it a "stream started" message after run() is called, and then the test
(with host probe) can do an expectMsg for that message, followed by an
expectNoMsg() before starting to publish messages. But this seems to be a
bit hokey/fragile.

Any ideas for a better way?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Source per Partition

2017-09-19 Thread Richard Rodseth
My preliminary testing suggests that the flatMapMerge version will *not*
work if the breadth value is less than maxPartitions.
I don't understand why all partition sources wouldn't continue to emit and
be merged.

On Mon, Sep 11, 2017 at 4:27 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> The first two code samples here show different ways of consuming multiple
> Kafka partitions,
> without really explaining why you would use one or the other.
>
> http://doc.akka.io/docs/akka-stream-kafka/current/consumer.
> html#source-per-partition
>
> The first uses flatMapMerge:
>
> val done = Consumer.committablePartitionedSource(consumerSettings, 
> Subscriptions.topics("topic1"))
>   .flatMapMerge(maxPartitions, _._2)
>   .via(business)
>   .batch(max = 20, first => 
> CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, 
> elem) =>
> batch.updated(elem.committableOffset)
>   }
>   .mapAsync(3)(_.commitScaladsl())
>   .runWith(Sink.ignore)
>
> and the second runs a per-partition stream producing a future.
>
> //Consumer group represented as Source[(TopicPartition, Source[Messages])]val 
> consumerGroup =
>   Consumer.committablePartitionedSource(consumerSettings, 
> Subscriptions.topics("topic1"))//Process each assigned partition separately
> consumerGroup.map {
>   case (topicPartition, source) =>
> source
>   .via(business)
>   .toMat(Sink.ignore)(Keep.both)
>   .run()}
>   .mapAsyncUnordered(maxPartitions)(_._2)
>   .runWith(Sink.ignore)
>
> Can anyone say anything about the performance characteristics or other
> pros and cons of these approaches? Also, should there be a custom
> dispatcher for the futures in the second one?
>
> We're currently doing something like the second, but using mapAsync rather
> than emitting the futures (from running the streams-per-partition) into the
> stream. And I actually had a bug using a parallelism factor less than the
> number of partitions and some partitions didn't get processed. It strikes
> me that that in a case like this where the Futures-per-partition only
> complete at times like rebalance, that Future is a somewhat confusing
> abstraction and the flatMapMerge is somewhat more intuitive. I believe in
> example one all messages in all partitions would still be emitted even if I
> erroneously picked too low a value for maxPartitions.
>
> Thanks in advance.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Source per Partition

2017-09-11 Thread Richard Rodseth
The first two code samples here show different ways of consuming multiple
Kafka partitions,
without really explaining why you would use one or the other.

http://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition

The first uses flatMapMerge:

val done = Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics("topic1"))
  .flatMapMerge(maxPartitions, _._2)
  .via(business)
  .batch(max = 20, first =>
CommittableOffsetBatch.empty.updated(first.committableOffset)) {
(batch, elem) =>
batch.updated(elem.committableOffset)
  }
  .mapAsync(3)(_.commitScaladsl())
  .runWith(Sink.ignore)

and the second runs a per-partition stream producing a future.

//Consumer group represented as Source[(TopicPartition,
Source[Messages])]val consumerGroup =
  Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics("topic1"))//Process each assigned partition
separately
consumerGroup.map {
  case (topicPartition, source) =>
source
  .via(business)
  .toMat(Sink.ignore)(Keep.both)
  .run()}
  .mapAsyncUnordered(maxPartitions)(_._2)
  .runWith(Sink.ignore)

Can anyone say anything about the performance characteristics or other pros
and cons of these approaches? Also, should there be a custom dispatcher for
the futures in the second one?

We're currently doing something like the second, but using mapAsync rather
than emitting the futures (from running the streams-per-partition) into the
stream. And I actually had a bug using a parallelism factor less than the
number of partitions and some partitions didn't get processed. It strikes
me that that in a case like this where the Futures-per-partition only
complete at times like rebalance, that Future is a somewhat confusing
abstraction and the flatMapMerge is somewhat more intuitive. I believe in
example one all messages in all partitions would still be emitted even if I
erroneously picked too low a value for maxPartitions.

Thanks in advance.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Source of Future[Option[A]]

2017-09-11 Thread Richard Rodseth
Off the top of my head: not sure how you are getting the parameters, if
any, to getNextItem, but since xxAsync methods like Source.unfoldAsync and
mapAsync expect a future-returning function, I would use getNextItem
(rather than identity) in one of  those to give you downstream elements of
type Option[A] and then use filter or map or collect to deal with the
Option-ness.

On Mon, Sep 11, 2017 at 1:44 PM, Ori Popowski  wrote:

> Hi,
>
> What is the correct way to create a source from a function which returns
> Future[Option[A]], besides a custom GraphStage?
>
> The source should emit A elements (not wrapped in Future[Option[]]).
>
> I considered a few options, none of them worked
>
> *First: using RestartSource*
>
> RestartSource.withBackoff(1.second, 1.second, 0.2) { () =>
>   Source(Stream.continually(getNextItem.map(_.getOrElse(throw new
> NoSuchElementException.mapAsync(4)(identity)
> }
>
>
>
> This method drops elements
>
>
> *Second: filtering*
>
> Source(Stream.continually(getNextItem)).mapAsync(4)(identity).collect {
>   case Some(e) => a
> }
>
>
>
> This method will create too many unnecessary calls to the resource
> (imagine a database) and cause DOS.
>
>
> So what is the idiomatic / preferable way?
>
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka HTTP Client Unmarshalling - different success and error responses

2017-08-22 Thread Richard Rodseth
Answer appears to be yes. Here's what my for comprehension looks like, and
the order of the DTOs in the Either doesn't matter.

val responseFuture = for {

  requestEntity <- Marshal(userAttributes).to

  val request = HttpRequest(method = HttpMethods.POST, uri = addUserURL,
headers = headers, entity = requestEntity)

  response <- http.singleRequest(request)

  strict <- response.toStrict(1 second)

  unmarshalled <- Unmarshal(strict).to[Either[AddUserResponseDTO,
AddUserErrorDTO]]

} yield (response, unmarshalled)

On Tue, Aug 22, 2017 at 1:00 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> Spoke too soon
>
> Exception in thread "main" akka.http.scaladsl.unmarshalling.Unmarshaller$
> EitherUnmarshallingException: Failed to unmarshal Either[AddUserErrorDTO,
> AddUserResponseDTO] (attempted AddUserResponseDTO first). Right failure:
> Object is missing required member 'id', Left failure: Substream Source
> cannot be materialized more than once
>
> Do I need to place a toStrict somewhere in my for comprehension?
>
>
>
> On Tue, Aug 22, 2017 at 12:28 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Having some success with Either unmarshaller. I don't think it's
>> documented, but here's a test:
>>
>> https://github.com/akka/akka-http/blob/master/akka-http-test
>> s/src/test/scala/akka/http/scaladsl/unmarshalling/UnmarshallingSpec.scala
>>
>> On Tue, Aug 22, 2017 at 11:16 AM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>>> I'm trying to use Akka HTTP client (singleRequest) with an API that
>>> returns different JSON for the success and error cases. How is that best
>>> handled? Also, I'm not sure if the explicit marshalling and unmarshalling
>>> below is the right way to do things.
>>>
>>> Any help much appreciated.
>>>
>>> def addUser(baseURL: String, userAttributes: AddUserRequestDTO,
>>> cookieHeader: Cookie)(implicit http: HttpExt, materializer: Materializer,
>>> ec: ExecutionContext) = {
>>> val headers = List(cookieHeader)
>>> val addUserURL = ...
>>>
>>> val responseFuture = for {
>>>   requestEntity <- Marshal(userAttributes).to
>>> val request = HttpRequest(method = HttpMethods.POST, uri =
>>> addUserURL, headers = headers, entity = requestEntity)
>>>   response <- http.singleRequest(request)
>>>   x <- Unmarshal(response.entity).to[AddUserResponseDTO]
>>>   //body <- response.entity.dataBytes.runFold(ByteString(""))(_ ++
>>> _)
>>>   //val x = body.utf8String
>>> } yield (response, x)
>>> responseFuture
>>>
>>>   }
>>>
>>
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka HTTP Client Unmarshalling - different success and error responses

2017-08-22 Thread Richard Rodseth
Spoke too soon

Exception in thread "main"
akka.http.scaladsl.unmarshalling.Unmarshaller$EitherUnmarshallingException:
Failed to unmarshal Either[AddUserErrorDTO, AddUserResponseDTO] (attempted
AddUserResponseDTO first). Right failure: Object is missing required member
'id', Left failure: Substream Source cannot be materialized more than once

Do I need to place a toStrict somewhere in my for comprehension?



On Tue, Aug 22, 2017 at 12:28 PM, Richard Rodseth <rrods...@gmail.com>
wrote:

> Having some success with Either unmarshaller. I don't think it's
> documented, but here's a test:
>
> https://github.com/akka/akka-http/blob/master/akka-http-
> tests/src/test/scala/akka/http/scaladsl/unmarshalling/
> UnmarshallingSpec.scala
>
> On Tue, Aug 22, 2017 at 11:16 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> I'm trying to use Akka HTTP client (singleRequest) with an API that
>> returns different JSON for the success and error cases. How is that best
>> handled? Also, I'm not sure if the explicit marshalling and unmarshalling
>> below is the right way to do things.
>>
>> Any help much appreciated.
>>
>> def addUser(baseURL: String, userAttributes: AddUserRequestDTO,
>> cookieHeader: Cookie)(implicit http: HttpExt, materializer: Materializer,
>> ec: ExecutionContext) = {
>> val headers = List(cookieHeader)
>> val addUserURL = ...
>>
>> val responseFuture = for {
>>   requestEntity <- Marshal(userAttributes).to
>> val request = HttpRequest(method = HttpMethods.POST, uri =
>> addUserURL, headers = headers, entity = requestEntity)
>>   response <- http.singleRequest(request)
>>   x <- Unmarshal(response.entity).to[AddUserResponseDTO]
>>   //body <- response.entity.dataBytes.runFold(ByteString(""))(_ ++ _)
>>   //val x = body.utf8String
>> } yield (response, x)
>> responseFuture
>>
>>   }
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka HTTP Client Unmarshalling - different success and error responses

2017-08-22 Thread Richard Rodseth
Having some success with Either unmarshaller. I don't think it's
documented, but here's a test:

https://github.com/akka/akka-http/blob/master/akka-http-tests/src/test/scala/akka/http/scaladsl/unmarshalling/UnmarshallingSpec.scala

On Tue, Aug 22, 2017 at 11:16 AM, Richard Rodseth <rrods...@gmail.com>
wrote:

> I'm trying to use Akka HTTP client (singleRequest) with an API that
> returns different JSON for the success and error cases. How is that best
> handled? Also, I'm not sure if the explicit marshalling and unmarshalling
> below is the right way to do things.
>
> Any help much appreciated.
>
> def addUser(baseURL: String, userAttributes: AddUserRequestDTO,
> cookieHeader: Cookie)(implicit http: HttpExt, materializer: Materializer,
> ec: ExecutionContext) = {
> val headers = List(cookieHeader)
> val addUserURL = ...
>
> val responseFuture = for {
>   requestEntity <- Marshal(userAttributes).to
> val request = HttpRequest(method = HttpMethods.POST, uri = addUserURL,
> headers = headers, entity = requestEntity)
>   response <- http.singleRequest(request)
>   x <- Unmarshal(response.entity).to[AddUserResponseDTO]
>   //body <- response.entity.dataBytes.runFold(ByteString(""))(_ ++ _)
>   //val x = body.utf8String
> } yield (response, x)
> responseFuture
>
>   }
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka HTTP Client Unmarshalling - different success and error responses

2017-08-22 Thread Richard Rodseth
I'm trying to use Akka HTTP client (singleRequest) with an API that returns
different JSON for the success and error cases. How is that best handled?
Also, I'm not sure if the explicit marshalling and unmarshalling below is
the right way to do things.

Any help much appreciated.

def addUser(baseURL: String, userAttributes: AddUserRequestDTO,
cookieHeader: Cookie)(implicit http: HttpExt, materializer: Materializer,
ec: ExecutionContext) = {
val headers = List(cookieHeader)
val addUserURL = ...

val responseFuture = for {
  requestEntity <- Marshal(userAttributes).to
val request = HttpRequest(method = HttpMethods.POST, uri = addUserURL,
headers = headers, entity = requestEntity)
  response <- http.singleRequest(request)
  x <- Unmarshal(response.entity).to[AddUserResponseDTO]
  //body <- response.entity.dataBytes.runFold(ByteString(""))(_ ++ _)
  //val x = body.utf8String
} yield (response, x)
responseFuture

  }

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Using Akka in Eclipse

2017-07-03 Thread Richard Rodseth
I use Eclipse-based Scala IDE. I import Maven projects. But I often have to
right click on the project and do Maven -> Update Project. I also often
have to do Scala -> Restart Presentation Compiler. Not sure if that's
helpful.

On Mon, Jul 3, 2017 at 11:45 AM, John Arnold  wrote:

> Patrik,
>
> So, I imported the quickstart project as a Maven project. Looking at the
> build path, I see several jars in my ~/.m2/repository directory.
> I guess, and not being familiar enough with Maven nor Lightbend, that when
> I invoked the maven compile exec:exec that it pulled the required jar files
> down.
>
> Probably, my next question would be: Why didn't the config-1.3.1.jar get
> built when I compiled Akka from scratch?
>
>
>
> On Monday, 3 July 2017 13:50:25 UTC-4, Patrik Nordwall wrote:
>>
>> I think Eclipse has a plugin that understands a maven pom and adds the
>> needed dependendencies to the project. You can import a maven project.
>>
>> In this case you are missing a transitive dependency. Trying to
>> add/maintain those manually will be a pain so I suggest that you let the
>> IDE/plugin handle it for you.
>>
>> /Patrik
>> mån 3 juli 2017 kl. 18:50 skrev John Arnold :
>>
>>> Using the Quickstart example (which builds and runs using Maven), I
>>> decided to try it in Eclipse.
>>>
>>> I added the 2 jars from my akka build, akka-actor_2.11-2.5-SNAPSHOT.jar
>>> and akka-testkit_2.11-2.5-SNAPSHOT.jar, and the sample program built
>>> successfully.
>>>
>>> Running produced an error:
>>>
>>>Exception in thread "main" java.lang.NoClassDefFoundError:
>>> com/typesafe/config/ConfigFactory
>>>
>>> Suggestions?
>>>
>>> --
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] ActorPublisher GraphStage alternative?

2017-05-27 Thread Richard Rodseth
In case it helps:

https://groups.google.com/d/topic/akka-user/AgVHHnl9ub4/discussion
https://github.com/akka/akka/issues/22742

On Fri, May 26, 2017 at 1:34 PM, Curt Siffert  wrote:

>
> Hi, I see in the docs for 2.5.2 that ActorPublisher/ActorSubscriber will
> be deprecated.
>
> In my (still beginning) experiments with akka streams I used
> ActorPublisher as a way to help create some back pressure controls while
> consuming messages from an external queue. This worked just by consuming
> the queue like normal and then for each message consumed, sending a message
> to ActorPublisher.
>
> Without using ActorPublisher, I can use a Source.actorRef, but that
> doesn't have back pressure controls.
>
> I know the recommended alternative to ActorPublisher is to use a custom
> graph stage and I have started experimenting with that but so far I don't
> see how to meet the ActorPublisher use case with it. So far it doesn't seem
> like a custom Source has an ActorRef type signature like Source.actorRef
> does. Once the custom stage is created, can I send a "tell" message to it
> the way I did to ActorPublisher? Or am I supposed to use Source.actorRef
> and then funnel it through the custom stage to get the back pressure
> controls?
>
> Sorry if my question is muddled, I am still making my way through this.
> :-) I recognize this is a bit weird since ideally the back pressure
> controls would be applied to the queueing tech itself.
>
> Curt
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] ANNOUNCE: New Akka documentation, website and Akka 2.5.2 released

2017-05-24 Thread Richard Rodseth
Looks nice.

However, if I go to

http://akka.io/docs/

and type "Dispatcher" in the search box, nothing happens. There's no Search
button, and Enter does nothing either.
Chrome, OS X.
Am I missing something?




On Wed, May 24, 2017 at 7:56 AM, Arnout Engelen <
arnout.enge...@lightbend.com> wrote:

> Dear hakkers,
>
> We are excited to announce not only a new patch release of Akka 2.5 but
> also brand new akka.io website and documentation.
>
> New
> documentation
>
> One conclusion from the community survey 2016
>  was
> that the getting started experience of Akka must be improved. Therefore we
> have written completely new Getting Started
>  and
> Quickstart  guides.
>
> A special thanks to Endre Varga  who wrote
> most of the Getting Started guide.
>
> You find the landing page for the documentation at http://akka.io/docs/
>
> The documentation tooling has been changed to Paradox
> . The markdown syntax and the zero
> installation will make it easier for contributors to help out with
> improving the documentation. Such contributions are very welcome and you
> find instructions in CONTRIBUTING.md
> .
>
> Akka
> 2.5.2
>
> If you are using Cluster Sharding with "remembering entities" there is one
> important change that requires that you make a change to your code when
> updating to 2.5.2. To solve a critical issue #22289
>  that could result in
> duplicate entities when the extractShardId function is changed (e.g.
> between deployments) there is now a new message Shard.StartEntity(
> entityId)that the extractShardId must understand. See example in Scala
> 
>  or Java
> 
> .
>
> Some other notable improvements and bug fixes of Akka 2.5.2 are:
>
>- Sharding entities not restarted after full cluster restart in ddata
>mode, #22868 
>- Bug in sharding snapshot deletion, #22916
>
>- Report cause for Akka IO TCP CommandFailed, #22954
>
>- BackoffSupervisor can reply to messages when child is stopped, #21213
>
>
> The development branch of Akka Typed has been merged back to master and is
> included in Akka 2.5.2, see the blog post series starting with Akka
> Typed: Hello World in the new API
> . The APIs may still
> change but we are confident that most of it is now stable. We will continue
> the work by integrating with other parts of Akka, such as Streams,
> Persistence, and Cluster.
>
> A total of 77 issues were closed since 2.5.1. The complete list of closed
> issues can be found on the 2.5.2
>  and 2.5.99-TYPED-M1
>  milestones on
> github.
>
> 
> Credits
>
> For this release we had the help of 27 committers – thank you all very
> much!
>
> commits  added  removed
>  44   7021 4706 Patrik Nordwall
>  29  2045322586 Arnout Engelen
>  18   2290 2122 Martynas Mickevičius
>  10   3173 1547 Johan Andrén
>   6272  233 Konrad `ktoso` Malawski
>   5118   17 Richard Imaoka
>   4137   37 andrea
>   3862  426 Peter Vlugter
>   2 37   11 Roland Kuhn
>   2 26   21 Hawstein
>   2 12   12 Philippus Baalman
>   14763 drewhk
>   1 66   21 Johannes Rudolph
>   1 22   22 stanislav
>   1 289 Chris Martin
>   1 185 Nafer Sanabria
>   1  88 Age Mooij
>   1  88 Lukas Phaf
>   1  95 Zahari Dichev
>   1  22 Masaru Nomura
>   1  11 Bartosz Kowalik
>   1  11 Sebastian Gavril
>   1  11 Alena Varkockova
>   1  11 André Laszlo
>   1  10 Packt
>   1  00 synox
>   1  00 Dale Wijnand
>
> Happy hakking!
>
> – The Akka Team
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check 

[akka-user] Reactive Kafka and Confluent

2017-05-11 Thread Richard Rodseth
The Confluent add ons like Schema Registry are open source and the client
library and Kafka itself are forks of the Apache versions.

reactive-kafka seems to be "reactive Kafka for Apache Kafka".

I'm wondering if folks are using it in conjunction with the Confluent
offerings.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] CoordinatedShutdown

2017-05-11 Thread Richard Rodseth
Thanks. I think I was missing my println output because I launched and
stopped in Eclipse.
Cntrl-C from the terminal worked fine.

On Wed, May 10, 2017 at 10:23 PM, Patrik Nordwall <patrik.nordw...@gmail.com
> wrote:

> You can call run from anywhere, e.g. instead of calling system.terminate()
> you can call CoordinatedShutdown(system).run().
>
> By default it will also run automatically when the JVM process exists,
> e.g. you call java.lang.System.exit or on kill SIGTERM signal.
>
> run is idempotent so you can call it concurrently, several times.
>
> On Thu, May 11, 2017 at 12:49 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> I was considering using CoordinatedShutdown to shutdown Kamon, either
>> with PhaseBeforeActorSystemTerminate or with addJvmShutdownHook
>>
>> But it seems I have to explicitly call run(). Where would I do that?
>> Not using Akka Cluster in this app.
>> Thanks.
>>
>>   val system = ActorSystem("test")
>>
>>   CoordinatedShutdown(system).addTask(
>>
>> CoordinatedShutdown.PhaseBeforeActorSystemTerminate,
>> "shutdownKamon") { () =>
>>
>>   println(s"Shutting down Kamon")
>>
>>   //Kamon.shutdown()
>>
>>   Future.successful(Done)
>>
>> }
>>
>> CoordinatedShutdown(system).addJvmShutdownHook {
>>
>> println("custom JVM shutdown hook...")
>>
>>   }
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
>
> Patrik Nordwall
> Akka Tech Lead
> Lightbend <http://www.lightbend.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] CoordinatedShutdown

2017-05-10 Thread Richard Rodseth
I was considering using CoordinatedShutdown to shutdown Kamon, either with
PhaseBeforeActorSystemTerminate or with addJvmShutdownHook

But it seems I have to explicitly call run(). Where would I do that?
Not using Akka Cluster in this app.
Thanks.

  val system = ActorSystem("test")

  CoordinatedShutdown(system).addTask(

CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "shutdownKamon")
{ () =>

  println(s"Shutting down Kamon")

  //Kamon.shutdown()

  Future.successful(Done)

}

CoordinatedShutdown(system).addJvmShutdownHook {

println("custom JVM shutdown hook...")

  }

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Restart stream with backoff

2017-05-08 Thread Richard Rodseth
I was hoping to promote a pattern in my organization, using reactive-kafka
(source-per-partition). Some of my colleagues are comfortable with actors,
but it would be great if others could be introduced to the streams APIs
without learning all about actors.

Am I correct that there is currently no way to restart a stream with
exponential backoff, other than running the stream in a "host" actor that
runs the stream and is fed stream elements using alsoTo ?

Backoff supervision is one of the items mentioned here:
https://github.com/akka/akka/issues/19950

Anyone else come up with a creative solution?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] experiences with serialization: json vs protobuf vs avro vs kryo...

2017-05-01 Thread Richard Rodseth
Because we're using Avro for Kafka, we also looked at avro and avro4s, but
ended up using Protobuf (with Maven plugin) for Akka Persistence. Mapping
between the IDL-generated classes and case classes is indeed unfortunate,
but no other issues so far.

On Mon, May 1, 2017 at 12:54 AM, Joost de Vries 
wrote:

> I'm trying to find a suitable serialization for akka persistence.
>
> There are a lot of options but not all of them are equally used and are as
> mature. F.i. we picked avro with avro4s but hit some problems that weren't
> easy to fix.
>
> Of course then it's tempting to fall back on json: it's familiar from
> rest, easy to analyse and there's stamina for schema evolution.
>
> But I wonder if there's another option. So my question is: which non json
> serialisation are you using and what are your experiences with schema
> evolution and otherwise?
> A special concern is that we'd rather not have to duplicate our case class
> structure in IDL files.
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: ProducerStage and CassandraSink

2017-04-28 Thread Richard Rodseth
Oh, right. Thanks. I understand now why the custom stage was written.

On Fri, Apr 28, 2017 at 9:03 AM, Akka Team <akka.offic...@gmail.com> wrote:

> The problem is that Future is not really suited for asynchronous work
> since there is no way to chain actions onto it, the only thing you can do
> is to poll or block until it is completed. It would have to be
> CompletableFuture.
>
> --
> Johan
> Akka Team
>
> On Fri, Apr 28, 2017 at 5:54 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Thanks. There *is* a version of send in Kafka that returns a Java Future.
>>
>> public java.util.concurrent.Future> <https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/RecordMetadata.html>>
>>  send(ProducerRecord 
>> <https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>>  
>> <https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>,V
>>  
>> <https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>>
>>  record)
>>
>> Either way, I wish one could write with maximum possible throughput
>> without having a "parallelism" value to tune, but I guess Kafka would have
>> to provide a Reactive Streams compatible API, or Akka would have to drop
>> down to the Kafka protocol level, assuming that is still a thing.
>>
>> On Fri, Apr 28, 2017 at 7:09 AM, Akka Team <akka.offic...@gmail.com>
>> wrote:
>>
>>> The Cassandra session provides an async method session.executeAsync
>>> which returns a CompletableFuture/Future which makes it usable with
>>> mapAsync while Kafka has a callback based async api, where you trigger an
>>> action and pass a callback to execute when the action has completed. It
>>> would not be possible to use the latter with mapAsync.
>>>
>>> --
>>> Johan
>>> Akka Team
>>>
>>> On Sat, Apr 8, 2017 at 1:08 AM, Richard Rodseth <rrods...@gmail.com>
>>> wrote:
>>>
>>>> I'm still curious what, if any, advantages the ProducerStage as in
>>>> reactive-kafka has over using mapAsync as in CassandraSink. Anyone?
>>>>
>>>>
>>>> On Fri, Mar 31, 2017 at 7:40 AM, Richard Rodseth <rrods...@gmail.com>
>>>> wrote:
>>>>
>>>>> I was looking at the implementation of the reactive-kafka
>>>>> ProducerStage. Very interesting.
>>>>>
>>>>> https://github.com/akka/reactive-kafka/blob/master/core/src/
>>>>> main/scala/akka/kafka/internal/ProducerStage.scala
>>>>>
>>>>> It seems this pattern could be used for any external API that returns
>>>>> a Future.
>>>>>
>>>>> By contrast, the Alpakka CassandraSink uses mapAsyncUnordered with a
>>>>> user-supplied parallelism value.
>>>>>
>>>>> https://github.com/akka/alpakka/blob/master/cassandra/src/ma
>>>>> in/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala
>>>>>
>>>>> Am I correct in understanding that the ProducerStage generates demand
>>>>> (absence of backpressure) only as Futures complete, whereas the
>>>>> CassandraSink generates backpressure (absence of demand) when the
>>>>> parallelism value is reached.
>>>>>
>>>>> In the ProducerStage case, would the max number of in-flight Futures
>>>>> be limited  by the internal buffer size, as described here? :
>>>>>
>>>>> http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-rate
>>>>> .html#Internal_buffers_and_their_effect
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>> urrent/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>> p/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to akka-user+unsubscr...@googlegroups.com.
>>>> To post to this group, send email to akka-user@googlegroups.com.
>>>> Visit th

Re: [akka-user] Re: ProducerStage and CassandraSink

2017-04-28 Thread Richard Rodseth
Thanks. There *is* a version of send in Kafka that returns a Java Future.

public java.util.concurrent.Futurehttps://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/RecordMetadata.html>>
send(ProducerRecord
<https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>,V
<https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>>
record)

Either way, I wish one could write with maximum possible throughput without
having a "parallelism" value to tune, but I guess Kafka would have to
provide a Reactive Streams compatible API, or Akka would have to drop down
to the Kafka protocol level, assuming that is still a thing.

On Fri, Apr 28, 2017 at 7:09 AM, Akka Team <akka.offic...@gmail.com> wrote:

> The Cassandra session provides an async method session.executeAsync which
> returns a CompletableFuture/Future which makes it usable with mapAsync
> while Kafka has a callback based async api, where you trigger an action and
> pass a callback to execute when the action has completed. It would not be
> possible to use the latter with mapAsync.
>
> --
> Johan
> Akka Team
>
> On Sat, Apr 8, 2017 at 1:08 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> I'm still curious what, if any, advantages the ProducerStage as in
>> reactive-kafka has over using mapAsync as in CassandraSink. Anyone?
>>
>>
>> On Fri, Mar 31, 2017 at 7:40 AM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>>> I was looking at the implementation of the reactive-kafka ProducerStage.
>>> Very interesting.
>>>
>>> https://github.com/akka/reactive-kafka/blob/master/core/src/
>>> main/scala/akka/kafka/internal/ProducerStage.scala
>>>
>>> It seems this pattern could be used for any external API that returns a
>>> Future.
>>>
>>> By contrast, the Alpakka CassandraSink uses mapAsyncUnordered with a
>>> user-supplied parallelism value.
>>>
>>> https://github.com/akka/alpakka/blob/master/cassandra/src/ma
>>> in/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala
>>>
>>> Am I correct in understanding that the ProducerStage generates demand
>>> (absence of backpressure) only as Futures complete, whereas the
>>> CassandraSink generates backpressure (absence of demand) when the
>>> parallelism value is reached.
>>>
>>> In the ProducerStage case, would the max number of in-flight Futures be
>>> limited  by the internal buffer size, as described here? :
>>>
>>> http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-rate
>>> .html#Internal_buffers_and_their_effect
>>>
>>> Thanks.
>>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Offset type and resumable projections for 2.5

2017-04-25 Thread Richard Rodseth
Fair enough. Thanks.

On Tue, Apr 25, 2017 at 4:50 AM, Akka Team <akka.offic...@gmail.com> wrote:

> I don't find that sample odd or doing something inherently bad.
>
> You might choose to serialize the whole Offset subtype, but you might also
> decide to serialize the long-value which will take considerable less space,
> may be possible to read directly from a database etc. I consider that
> design decisions, the sample is just there to show that you would start
> from a stored offset and store offset as the query progress.
>
> --
> Johan
> Akka Team
>
> On Thu, Apr 13, 2017 at 11:30 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Congrats on the release of 2.5.
>>
>> Isn't the documentation and accompanying sample for Resumable Projections
>> a bit odd?
>>
>> http://doc.akka.io/docs/akka/2.5.0/scala/persistence-query.h
>> tml#Resumable_projections
>> https://github.com/akka/akka/blob/master/akka-docs/rst/scala
>> /code/docs/persistence/query/PersistenceQueryDocSpec.scala
>>
>> Why would we be constructing a Sequence? Wouldn't the offset be stored in
>> a plugin-agnostic way and just passed back? In my case, the offset is in a
>> Protobuf persistence event, and I'm assuming I would have to convert it
>> (whether Long or TimeUUID) to a ByteString and use a Protobuf field of type
>> "bytes".
>>
>>
>>1. bidProjection.latestOffset.foreach { startFromOffset =>
>>2. readJournal
>>3. .eventsByTag("bid", Sequence(startFromOffset))
>>4. .mapAsync(8) { envelope => (writer ? envelope.event).map(_ =>
>>envelope.offset) }
>>5. .mapAsync(1) { offset => bidProjection.saveProgress(offset) }
>>6. .runWith(Sink.ignore)
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] GraphStage instead of ActorPublisher/ActorSubscriber

2017-04-18 Thread Richard Rodseth
I've read that ActorPublisher and ActorSubscriber are deprecated in favour
of GraphStage, but haven't found examples of how to accomplish this. This
section of the documentation is a stub:

http://doc.akka.io/docs/akka/2.5/scala/stream/stream-customize.html#Integration_with_actors

I have several scenarios where I need to thread a stream through an actor,
but still want to take advantage of backpressure.

So far I'm using mapAsync() and ask()

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Offset type and resumable projections for 2.5

2017-04-13 Thread Richard Rodseth
Congrats on the release of 2.5.

Isn't the documentation and accompanying sample for Resumable Projections a
bit odd?

http://doc.akka.io/docs/akka/2.5.0/scala/persistence-query.html#Resumable_projections
https://github.com/akka/akka/blob/master/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala

Why would we be constructing a Sequence? Wouldn't the offset be stored in a
plugin-agnostic way and just passed back? In my case, the offset is in a
Protobuf persistence event, and I'm assuming I would have to convert it
(whether Long or TimeUUID) to a ByteString and use a Protobuf field of type
"bytes".


   1. bidProjection.latestOffset.foreach { startFromOffset =>
   2. readJournal
   3. .eventsByTag("bid", Sequence(startFromOffset))
   4. .mapAsync(8) { envelope => (writer ? envelope.event).map(_ => envelope
   .offset) }
   5. .mapAsync(1) { offset => bidProjection.saveProgress(offset) }
   6. .runWith(Sink.ignore)

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Has akka-http has abandoned per request actors in favor an anti-pattern DSL?

2017-04-08 Thread Richard Rodseth
DSL.
Route contains:

  onSuccess(requestHandler ? RequestHandler.AskForStatus) { ... }

On Sat, Apr 8, 2017 at 2:31 PM, kraythe  wrote:

> And you accomplish this with the low level or DSL api.
>
> On Saturday, April 8, 2017 at 12:56:26 AM UTC-5, rrodseth wrote:
>>
>> I use per-request actors. The only ask() is in the route, and is sent to
>> a RequestHandler actor. The RequestHandler creates a per-request actor with
>> the sender as a Props/Constructor parameter ("requestor"), and sends it a
>> "Run" message.
>>
>> The per-request actor can coordinate with as many actors as it needs to.
>> It can use "self" as a "replyTo" property of messages it sends out (tell).
>> It sends the final result back to its "requestor" and stops itself.
>>
>> With Spray I didn't need the outermost ask(), but I think it's fine
>> having one at the outermost layer.
>>
>> On Fri, Apr 7, 2017 at 9:45 PM, kraythe  wrote:
>>
>>> A very interesting read and I appreciate you showing examples. What I do
>>> wonder about there is what the situation is when the server has to do more
>>> than simply return information from a database or other similar web rest-ui
>>> simple examples. For example, I have processes that require aggregating
>>> data from several actors to complete the process. Now I could do these in a
>>> series of ASKs but then that would feel like a complete anti-pattern to me.
>>> I would rather model the system as an entirely one way calling pattern
>>> where the only future is in the final completion of the route. So here is
>>> the challenge:
>>>
>>> You have a route called /customerInfo which finds the search parameters
>>> out of the JSON body that is passed to the api. Then it has to invoke calls
>>> to two ProductManagerActor and InventoryManagerActor. These actors will
>>> send back responses which might result in the task being done but will be
>>> much more likely to require calling actor ShippingStatusActor several
>>> times to ask for different pieces of data. Once you have collected all of
>>> the data then finally you can assemble the JSON response. Yes. I could
>>> model this imperatively with futures and using the ask pattern inside the
>>> route but then I am massively constrained resources and I have never found
>>> that to be scalable under load. Indeed I try to avoid futures as much as I
>>> can because of these difficulties. I would prefer instead to have an actor
>>> handle the request, recieving messages from the three actors and then when
>>> it has assembled all of the data it would generate a response or if there
>>> is a timeout or exception it would generate other responses.
>>>
>>> Now using the route DSL I wonder how you would manage this process
>>> within your route files. At some point you have to jump off the DSL and
>>> offload the task to an actor. For example you can decode the parameters and
>>> the body and so on using the route but then you have to start the process
>>> of collecting this massive amount of data to send back and then you are off
>>> the route and your options are now limited as to how you can respond. You
>>> cant simply say inside the DataCollectionActor to invoke the bad
>>> request response, through the DSL. you now have to use the low level API to
>>> signal a response.
>>>
>>> I would be curious how you could handle this use case with the DSL.
>>>
>>> -- Robert
>>>
>>>
>>> On Tuesday, April 4, 2017 at 2:38:52 AM UTC-5, Daniel Stoner wrote:

 Hi Kraythe,

 Perhaps it helps to see a real world example that we've been working on
 - with a good number of routes involved.

 This is from our AkkaHttpServer class. It's job is to inject all the
 routes (ordersV2, searchv3, searchTerms, persistence) which consist of
 around 6 actual endpoints per injected class - into the right point in the
 hierarchy (Below the oAuth2 authenticator and any request/response loggers
 and whatnot that you may need).

 We define index.html and healthcheck route in this class since they are
 one liners that live above oAuth2 security otherwise we would also inject
 them independently.

 Route indexRoute = get(() -> route(pathSingleSlash(() ->
 getFromResource("web/index.html";
 Route healthCheck = get(() -> path(PATH_HEALTH_CHECK, () ->
 extractRequestContext(healthCheckHandler::handle)));

 Route apis = route(
 indexRoute,
 healthCheck,
 oauth2Authentication(
 accessTokenVerifier,
 route(
 ordersV2,
 searchV3,
 searchTerms,
 persistence
 )
 )
 );

 return logRequestResult(
 

Re: [akka-user] Re: Has akka-http has abandoned per request actors in favor an anti-pattern DSL?

2017-04-07 Thread Richard Rodseth
I use per-request actors. The only ask() is in the route, and is sent to a
RequestHandler actor. The RequestHandler creates a per-request actor with
the sender as a Props/Constructor parameter ("requestor"), and sends it a
"Run" message.

The per-request actor can coordinate with as many actors as it needs to. It
can use "self" as a "replyTo" property of messages it sends out (tell). It
sends the final result back to its "requestor" and stops itself.

With Spray I didn't need the outermost ask(), but I think it's fine having
one at the outermost layer.

On Fri, Apr 7, 2017 at 9:45 PM, kraythe  wrote:

> A very interesting read and I appreciate you showing examples. What I do
> wonder about there is what the situation is when the server has to do more
> than simply return information from a database or other similar web rest-ui
> simple examples. For example, I have processes that require aggregating
> data from several actors to complete the process. Now I could do these in a
> series of ASKs but then that would feel like a complete anti-pattern to me.
> I would rather model the system as an entirely one way calling pattern
> where the only future is in the final completion of the route. So here is
> the challenge:
>
> You have a route called /customerInfo which finds the search parameters
> out of the JSON body that is passed to the api. Then it has to invoke calls
> to two ProductManagerActor and InventoryManagerActor. These actors will
> send back responses which might result in the task being done but will be
> much more likely to require calling actor ShippingStatusActor several
> times to ask for different pieces of data. Once you have collected all of
> the data then finally you can assemble the JSON response. Yes. I could
> model this imperatively with futures and using the ask pattern inside the
> route but then I am massively constrained resources and I have never found
> that to be scalable under load. Indeed I try to avoid futures as much as I
> can because of these difficulties. I would prefer instead to have an actor
> handle the request, recieving messages from the three actors and then when
> it has assembled all of the data it would generate a response or if there
> is a timeout or exception it would generate other responses.
>
> Now using the route DSL I wonder how you would manage this process within
> your route files. At some point you have to jump off the DSL and offload
> the task to an actor. For example you can decode the parameters and the
> body and so on using the route but then you have to start the process of
> collecting this massive amount of data to send back and then you are off
> the route and your options are now limited as to how you can respond. You
> cant simply say inside the DataCollectionActor to invoke the bad request
> response, through the DSL. you now have to use the low level API to signal
> a response.
>
> I would be curious how you could handle this use case with the DSL.
>
> -- Robert
>
>
> On Tuesday, April 4, 2017 at 2:38:52 AM UTC-5, Daniel Stoner wrote:
>>
>> Hi Kraythe,
>>
>> Perhaps it helps to see a real world example that we've been working on -
>> with a good number of routes involved.
>>
>> This is from our AkkaHttpServer class. It's job is to inject all the
>> routes (ordersV2, searchv3, searchTerms, persistence) which consist of
>> around 6 actual endpoints per injected class - into the right point in the
>> hierarchy (Below the oAuth2 authenticator and any request/response loggers
>> and whatnot that you may need).
>>
>> We define index.html and healthcheck route in this class since they are
>> one liners that live above oAuth2 security otherwise we would also inject
>> them independently.
>>
>> Route indexRoute = get(() -> route(pathSingleSlash(() ->
>> getFromResource("web/index.html";
>> Route healthCheck = get(() -> path(PATH_HEALTH_CHECK, () ->
>> extractRequestContext(healthCheckHandler::handle)));
>>
>> Route apis = route(
>> indexRoute,
>> healthCheck,
>> oauth2Authentication(
>> accessTokenVerifier,
>> route(
>> ordersV2,
>> searchV3,
>> searchTerms,
>> persistence
>> )
>> )
>> );
>>
>> return logRequestResult(
>> this::requestMethodAsInfo,
>> this::rejectionsAsInfo,
>> () -> handleExceptions(
>> exceptionHandlerLogAndReturnInternalError(),
>> () -> handleRejections(
>> rejectionHandlerLogAndReturnNotFound(),
>> () -> apis
>> )
>> )
>> );
>>
>> Note the *handleExceptions *and *handleRejections* methods. Basically if

[akka-user] Re: ProducerStage and CassandraSink

2017-04-07 Thread Richard Rodseth
I'm still curious what, if any, advantages the ProducerStage as in
reactive-kafka has over using mapAsync as in CassandraSink. Anyone?

On Fri, Mar 31, 2017 at 7:40 AM, Richard Rodseth <rrods...@gmail.com> wrote:

> I was looking at the implementation of the reactive-kafka ProducerStage.
> Very interesting.
>
> https://github.com/akka/reactive-kafka/blob/master/
> core/src/main/scala/akka/kafka/internal/ProducerStage.scala
>
> It seems this pattern could be used for any external API that returns a
> Future.
>
> By contrast, the Alpakka CassandraSink uses mapAsyncUnordered with a
> user-supplied parallelism value.
>
> https://github.com/akka/alpakka/blob/master/cassandra/
> src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala
>
> Am I correct in understanding that the ProducerStage generates demand
> (absence of backpressure) only as Futures complete, whereas the
> CassandraSink generates backpressure (absence of demand) when the
> parallelism value is reached.
>
> In the ProducerStage case, would the max number of in-flight Futures be
> limited  by the internal buffer size, as described here? :
>
> http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-
> rate.html#Internal_buffers_and_their_effect
>
> Thanks.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] ProducerStage and CassandraSink

2017-03-31 Thread Richard Rodseth
I was looking at the implementation of the reactive-kafka ProducerStage.
Very interesting.

https://github.com/akka/reactive-kafka/blob/master/core/src/main/scala/akka/kafka/internal/ProducerStage.scala

It seems this pattern could be used for any external API that returns a
Future.

By contrast, the Alpakka CassandraSink uses mapAsyncUnordered with a
user-supplied parallelism value.

https://github.com/akka/alpakka/blob/master/cassandra/src/main/scala/akka/stream/alpakka/cassandra/scaladsl/CassandraSink.scala

Am I correct in understanding that the ProducerStage generates demand
(absence of backpressure) only as Futures complete, whereas the
CassandraSink generates backpressure (absence of demand) when the
parallelism value is reached.

In the ProducerStage case, would the max number of in-flight Futures be
limited  by the internal buffer size, as described here? :

http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-rate.html#Internal_buffers_and_their_effect

Thanks.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Having Sharded Persistent Actors reset persistence data on initialization

2017-03-10 Thread Richard Rodseth
>> Exactly, so you would have to have another actor (at least one) on each
node that is always alive and subscribes to pubsub and delegates to the
ShardRegion.

So no straightforward way to broadcast to all  instances of an aggregate?


On Thu, Mar 9, 2017 at 10:52 PM, Patrik Nordwall <patrik.nordw...@gmail.com>
wrote:

>
>
> On Thu, Mar 9, 2017 at 7:42 PM, Richard Ney <kamisama@gmail.com>
> wrote:
>
>> In a sharded system the life-cycle of the actor is controlled by the
>> Shard actor. So if you are subscribing to distributed pub/sub in your
>> actor's constructor or receive method and you passivate there is no way for
>> the Shard actor to respawn your passivated actor because the message isn't
>> coming through the ShardRegion.
>>
>
> Exactly, so you would have to have another actor (at least one) on each
> node that is always alive and subscribes to pubsub and delegates to the
> ShardRegion.
>
>
>> Also we found out that unless we call "unsubscribe" in the preStop method
>> we were leaking actor references since the mediator doesn't know your actor
>> no longer exists and just sends the messages to dead-letters.
>>
>
> That sounds wrong. The mediator is watching the subscribers and remove
> them when they are terminated. Please open an issue if you are sure and
> have a repeatable test.
>
>
>>
>> On Thu, Mar 9, 2017 at 9:51 AM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>>> Is it true that
>>>
>>>- Passivated (sleeping) actors aren't able to listen to the pub/sub
>>>topic.
>>>
>>> ?
>>>
>>> Unfortunately there's no answer here:
>>> http://stackoverflow.com/questions/40782570/can-akka-distrib
>>> utedpubsub-deal-with-passivated-subscribers
>>>
>>> On Wed, Mar 8, 2017 at 6:18 PM, Richard Ney <kamisama@gmail.com>
>>> wrote:
>>>
>>>> Environment:
>>>>
>>>>- Distributed cluster with persistence sharded actors.
>>>>- Persistent actors are setup to Passivate when inactive.
>>>>- All actors have the Backoff Supervisor wrapping the actual
>>>>persistent actor
>>>>- Shard regions are common across our users
>>>>
>>>> Problem:
>>>>
>>>>- At times it's required to reset the actor states
>>>>- Live actors currently listen to a Distributed Pub/Sub topics for
>>>>the reset message at which point they reset state and save a snapshot of
>>>>the state.
>>>>- Passivated (sleeping) actors aren't able to listen to the pub/sub
>>>>topic.
>>>>- Sending message to restore actor just to reset its state is a
>>>>needless load on the persistence data store since we could play back 
>>>> 1000s
>>>>of messages before wiping state.
>>>>
>>>> Question:
>>>>
>>>>- How can I either send a message or pass a dynamic parameter into
>>>>a Sharded Persistent Actor so I can disable recovery before the actor
>>>>starts loading from the data store?
>>>>
>>>>
>>>> -Thanks
>>>>
>>>>
>>>> Richard
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>> urrent/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>> p/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to akka-user+unsubscr...@googlegroups.com.
>>>> To post to this group, send email to akka-user@googlegroups.com.
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>>

Re: [akka-user] Having Sharded Persistent Actors reset persistence data on initialization

2017-03-09 Thread Richard Rodseth
Is it true that

   - Passivated (sleeping) actors aren't able to listen to the pub/sub
   topic.

?

Unfortunately there's no answer here:
http://stackoverflow.com/questions/40782570/can-akka-distributedpubsub-deal-with-passivated-subscribers

On Wed, Mar 8, 2017 at 6:18 PM, Richard Ney  wrote:

> Environment:
>
>- Distributed cluster with persistence sharded actors.
>- Persistent actors are setup to Passivate when inactive.
>- All actors have the Backoff Supervisor wrapping the actual
>persistent actor
>- Shard regions are common across our users
>
> Problem:
>
>- At times it's required to reset the actor states
>- Live actors currently listen to a Distributed Pub/Sub topics for the
>reset message at which point they reset state and save a snapshot of the
>state.
>- Passivated (sleeping) actors aren't able to listen to the pub/sub
>topic.
>- Sending message to restore actor just to reset its state is a
>needless load on the persistence data store since we could play back 1000s
>of messages before wiping state.
>
> Question:
>
>- How can I either send a message or pass a dynamic parameter into a
>Sharded Persistent Actor so I can disable recovery before the actor starts
>loading from the data store?
>
>
> -Thanks
>
>
> Richard
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] PersistentActor and BackoffSupervisor problen

2017-03-08 Thread Richard Rodseth
I think they are referring to child actors. i.e. it's OK for actor A to
have a backoff supervisor (A can be persistent or not), but if it has a
child which is a persistent actor then the caveats apply.

On Wed, Mar 8, 2017 at 6:09 AM, Alan Burlison 
wrote:

> On 08/03/17 14:03, Alan Burlison wrote:
>
> That's not in the docs, it's just a source code comment - I'd have thought
>> it was important enough a restriction to be in the scaladocs?
>>
>> Also, if BackoffSupervisor shouldn't be used with PersistentActors, what
>> is
>> the correct way of dealing with IO failures in persistent actors?
>>
>
> Plus it directly contradicts this:
>
> http://doc.akka.io/docs/akka/current/scala/persistence.html#Failures
>
> "It is better to stop the actor and after a back-off timeout start it
> again. The akka.pattern.BackoffSupervisor actor is provided to support such
> restarts."
>
> ???
>
>
> --
> Alan Burlison
> --
>
> --
>
>>  Read the docs: http://akka.io/docs/
>>>  Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>>  Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>>
>> --- You received this message because you are subscribed to the
> Google Groups "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: CircuitBreaker and ExponentialBackOff in Streams

2017-02-28 Thread Richard Rodseth
Bump.
Has anyone implemented a CircuitBreaker-like stage
(mapAsyncWithCircuitBreaker perhaps?) that would provide backpressure?​

On Mon, Feb 27, 2017 at 2:18 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> What's the current thinking on this? This ticket is open:
>
> https://github.com/akka/akka/issues/15307
>
> I have an infinite stream communicating with an external service. If the
> external service goes down or some threshhold of errors is reached, it may
> be appropriate to stop the actor that runs the stream and restart it after
> some period.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] CircuitBreaker and ExponentialBackOff in Streams

2017-02-27 Thread Richard Rodseth
What's the current thinking on this? This ticket is open:

https://github.com/akka/akka/issues/15307

I have an infinite stream communicating with an external service. If the
external service goes down or some threshhold of errors is reached, it may
be appropriate to stop the actor that runs the stream and restart it after
some period.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] eventsByTag as merge

2017-02-25 Thread Richard Rodseth
Persistent Actor A consumes from Kafka and stores some events (let's call
them ProcessingRequested). Persistent Actor B runs a processing stream
whose source is tagged events from Persistent Actor A. As messages exit the
processing stream they are fed back to B which persists ProcessingSucceeded
or ProcessingFailed. Works like a charm.

However, we want to feed some failed elements back into the processing
stream in a resilient fashion. It occurred to us that we could tag
ProcessingFailed with the same tag as ProcessingRequested.

This feels nice and simple but also a little artificial. My sense is the
primary use case for eventsByTag is for merging the events of multiple
instances of an Aggregate.

I haven't mastered GraphStage, or AtLeastOnceDelivery or Source.queue,
which I imagine might offer an alternative design. Any comments?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] snapshotSequenceNr

2017-02-10 Thread Richard Rodseth
What is the purpose of this method? It just returns lastSequenceNr.

I'm implementing an API to delete unneeded events and snapshots.

It seems I will have to store (in memory) the metadata provided with
SnapShotOffer. That's fine, I'm just curious what the purpose
of snapshotSequenceNr is.

I also need to account for projections which might not be caught up. That
got me wondering if the team had ever considered adding some awareness of
projections to PersistentActor.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Snapshot and Journal Cleanup

2017-01-30 Thread Richard Rodseth
We are using the akka-persistence-jdbc plugin. I know there are APIs for
deleting snapshots and events. But I'm wondering if it's essential that a
"journal manager" tool use these, or could we just issue SQL commands. What
are others doing?

And yes, I know that the events often have business value.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Saving snapshots

2017-01-26 Thread Richard Rodseth
Oops. Thanks!

Sent from my phone - will be brief

> On Jan 26, 2017, at 8:09 AM, Patrik Nordwall <patrik.nordw...@gmail.com> 
> wrote:
> 
> Sharded actors can have constructor. You define the Props for them.
>> tors 26 jan. 2017 kl. 16:45 skrev Richard Rodseth <rrods...@gmail.com>:
>> Also, a little off topic, but since I have your attention :)
>> I'd like to have snapShotInterval be configurable, but sharded actors don't 
>> take constructor parameters and I'd rather not load config in the actor.
>> Do people typically have an Init message for sharded actors, that contains 
>> this sort of config?
>> 
>> On Thu, Jan 26, 2017 at 7:40 AM, Richard Rodseth <rrods...@gmail.com> wrote:
>> Good to know!
>> 
>> https://github.com/akka/akka/issues/22233
>> 
>> On Thu, Jan 26, 2017 at 7:25 AM, Patrik Nordwall <patrik.nordw...@gmail.com> 
>> wrote:
>> There is already a counter (the sequence number) that you can use.
>> 
>> if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0)
>> 
>> On Thu, Jan 26, 2017 at 3:53 PM, Konrad Malawski 
>> <konrad.malaw...@lightbend.com> wrote:
>> B, it's async anyway.
>> 
>> -- 
>> Konrad `ktoso` Malawski
>> Akka @ Lightbend
>> 
>>> On 26 January 2017 at 15:48:58, Richard Rodseth (rrods...@gmail.com) wrote:
>>> 
>>> a) or b) ?
>>> 
>>> a)
>>> 
>>> if (count % snapShotInterval == 0) {
>>> 
>>>   self ! SaveSnapShot
>>> 
>>> }
>>> 
>>> 
>>> 
>>> b)
>>> 
>>> if (count % snapShotInterval == 0) {
>>> 
>>>   saveSnapshot(...)
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google Groups 
>>> "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an 
>>> email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>> 
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>> 
>> 
>> 
>> -- 
>> Patrik Nordwall
>> Akka Tech Lead
>> Lightbend -  Reactive apps on the JVM
>> Twitter: @patriknw
>> 
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>> 
>> 
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
&

Re: [akka-user] Saving snapshots

2017-01-26 Thread Richard Rodseth
Also, a little off topic, but since I have your attention :)
I'd like to have snapShotInterval be configurable, but sharded actors don't
take constructor parameters and I'd rather not load config in the actor.
Do people typically have an Init message for sharded actors, that contains
this sort of config?

On Thu, Jan 26, 2017 at 7:40 AM, Richard Rodseth <rrods...@gmail.com> wrote:

> Good to know!
>
> https://github.com/akka/akka/issues/22233
>
> On Thu, Jan 26, 2017 at 7:25 AM, Patrik Nordwall <
> patrik.nordw...@gmail.com> wrote:
>
>> There is already a counter (the sequence number) that you can use.
>>
>> if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0)
>>
>> On Thu, Jan 26, 2017 at 3:53 PM, Konrad Malawski <
>> konrad.malaw...@lightbend.com> wrote:
>>
>>> B, it's async anyway.
>>>
>>> --
>>> Konrad `ktoso` Malawski
>>> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>>>
>>> On 26 January 2017 at 15:48:58, Richard Rodseth (rrods...@gmail.com)
>>> wrote:
>>>
>>> a) or b) ?
>>>
>>> a)
>>>
>>> if (count % snapShotInterval == 0) {
>>>
>>>   self ! SaveSnapShot
>>>
>>> }
>>>
>>>
>>> b)
>>>
>>> if (count % snapShotInterval == 0) {
>>>
>>>   saveSnapshot(...)
>>>
>>> }
>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>>
>> Patrik Nordwall
>> Akka Tech Lead
>> Lightbend <http://www.lightbend.com/> -  Reactive apps on the JVM
>> Twitter: @patriknw
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Saving snapshots

2017-01-26 Thread Richard Rodseth
Good to know!

https://github.com/akka/akka/issues/22233

On Thu, Jan 26, 2017 at 7:25 AM, Patrik Nordwall <patrik.nordw...@gmail.com>
wrote:

> There is already a counter (the sequence number) that you can use.
>
> if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0)
>
> On Thu, Jan 26, 2017 at 3:53 PM, Konrad Malawski <
> konrad.malaw...@lightbend.com> wrote:
>
>> B, it's async anyway.
>>
>> --
>> Konrad `ktoso` Malawski
>> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>>
>> On 26 January 2017 at 15:48:58, Richard Rodseth (rrods...@gmail.com)
>> wrote:
>>
>> a) or b) ?
>>
>> a)
>>
>> if (count % snapShotInterval == 0) {
>>
>>   self ! SaveSnapShot
>>
>> }
>>
>>
>> b)
>>
>> if (count % snapShotInterval == 0) {
>>
>>   saveSnapshot(...)
>>
>> }
>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
>
> Patrik Nordwall
> Akka Tech Lead
> Lightbend <http://www.lightbend.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Snapshots

2017-01-08 Thread Richard Rodseth
I've implemented snapshotting every n events by checking if offset modulo n
== 0.
Also, to answer myself, the snapshot is a Protobuf object as well.


On Sun, Jan 8, 2017 at 5:14 PM, Tal Pressman  wrote:

> Still, I don't think you can get snapshots that were saved by the
> "original" persistent actor through the persistence query. It's fairly easy
> to check, though - just save a snapshot and see if you can read it from the
> query.
>
> In any case, what I meant is that instead of only saving the offset in the
> read-side, you could save the offset along with the accumulated state and
> handle that snapshot as part of the persistent actor's recovery (by
> handling SnapshotOffer). Note that saving the offsets (and snapshots) is
> just an optimization, and you don't have to save them after *every* handled
> event. You could save them every number of events, after a certain time
> interval (just make sure they are saved together, so the information
> contained in the snapshot is in sync with the offset).
>
> Tal
>
>
> On Thursday, January 5, 2017 at 4:21:36 PM UTC+2, rrodseth wrote:
>
>> I have a PersistentActor which is a read side projection that runs a
>> persistence-query stream. It's only state is an offset, a Long. Protobuf
>> serialization is used for persistence messages.
>>
>> I'm preparing to add snapshotting. The examples show a "snap" message
>> handled by calling saveSnapshot.
>>
>> Am I correct that I should make the internal state passed to
>> saveSnapshot() a Protobuf object as well?
>>
>> And when would "snap" (or case object SaveSnapShot) typically be sent?
>>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Snapshots

2017-01-08 Thread Richard Rodseth
Sorry this wasn't more clear. I *am* saving the offset of a
persistence-query. I'm saving it in the persistent actor that issues the
query.

On Sun, Jan 8, 2017 at 9:48 AM, Tal Pressman  wrote:

> If I am not mistaken, persistence-query does not support snapshots so you
> would have to save the snapshot along with your offset, and then for
> recovery you should load it manually and restart the persistence-query from
> the saved offset.
>
> Tal
>
>
> On Thursday, January 5, 2017 at 4:21:36 PM UTC+2, rrodseth wrote:
>>
>> I have a PersistentActor which is a read side projection that runs a
>> persistence-query stream. It's only state is an offset, a Long. Protobuf
>> serialization is used for persistence messages.
>>
>> I'm preparing to add snapshotting. The examples show a "snap" message
>> handled by calling saveSnapshot.
>>
>> Am I correct that I should make the internal state passed to
>> saveSnapshot() a Protobuf object as well?
>>
>> And when would "snap" (or case object SaveSnapShot) typically be sent?
>>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Snapshots

2017-01-05 Thread Richard Rodseth
I have a PersistentActor which is a read side projection that runs a
persistence-query stream. It's only state is an offset, a Long. Protobuf
serialization is used for persistence messages.

I'm preparing to add snapshotting. The examples show a "snap" message
handled by calling saveSnapshot.

Am I correct that I should make the internal state passed to saveSnapshot()
a Protobuf object as well?

And when would "snap" (or case object SaveSnapShot) typically be sent?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] PersistentShardCoordinator - Persistence failure

2016-12-20 Thread Richard Rodseth
Any tips for debugging one of these?

PersistentShardCoordinator - Persistence failure when replaying events for
persistenceId

Right now, our logging config is broken so I can't even get more info with
loglevel = DEBUG, but since I'm desperate I'm hoping there's a common
mistake someone will recognize.

It's a one-node cluster.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Persistence with Avro

2016-12-09 Thread Richard Rodseth
Yes, I've read the docs. Would you say most Lightbend customers use
Protobuf? I was leaning towards Avro since it seems to be popular in Kafka
land.

On Fri, Dec 9, 2016 at 6:13 AM, Konrad Malawski <
konrad.malaw...@typesafe.com> wrote:

> Hi Richard,
> I fear not that many people use Avro.
> But I can in general answer your question about SerializerWithStringManif
> est - yeah it's usually the right way, since it's the most evolvable (you
> could even remove classes since it's not keyed by classes strictly).
> So that seems fine.
>
> Making sure you've seen: http://doc.akka.io/docs/akka/current/scala/
> persistence-schema-evolution.html
>
> Happy hakking
>
> On Wed, Dec 7, 2016 at 12:53 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Anyone else using Avro and avro4s to serialize persistence events? I was
>> able to get the SerializerWithStringManifest below working (with generic
>> serialize and deserialize methods). Should I be extending something other
>> than SerializerWithStringManifest, given that Avro on its own helps with
>> schema evolution?
>>
>> Note: avro4s seems very promising, although I did run into this limitation
>> https://github.com/sksamuel/avro4s/issues/75
>>
>> class PersistenceEventsAvroSerializer extends
>> SerializerWithStringManifest {
>>
>>   val FooManifest = "Foo"
>>
>>   val BarManifest = "Bar"
>>
>>   def identifier = 1234567
>>
>>  override def manifest(obj: AnyRef): String =
>>
>> obj match {
>>
>>   case _: Bar => BarManifest
>>
>>   case _: Foo => FooManifest
>>
>> }
>>
>>  override def toBinary(obj: AnyRef): Array[Byte] = {
>>
>>obj match {
>>
>>   case foo: Foo => serialize[Foo](foo)
>>
>>   case bsr: Bar => serialize[Bar](bar)
>>
>> }
>>
>>   }
>>
>>  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
>> {
>>
>> manifest match {
>>
>>   case FooManifest => deserialize[Foo](bytes)
>>
>>   case BarManifest => deserialize[Bar](bytes)
>>
>> }
>>
>>   }
>>
>>   private def serialize[T](data: T)(implicit s: SchemaFor[T], r:
>> ToRecord[T]): Array[Byte] = {
>>
>> val baos = new ByteArrayOutputStream
>>
>> val output = AvroOutputStream.binary[T](baos)
>>
>> output.write(data)
>>
>> output.close()
>>
>> val result = baos.toByteArray()
>>
>> result
>>
>>   }
>>
>>
>>   private def deserialize[T](data: Array[Byte])(implicit s:
>> SchemaFor[T], r: FromRecord[T]): T = {
>>
>>
>> val input = AvroInputStream.binary[T](data)
>>
>> val result: T = if (input.iterator.isEmpty) {
>>
>>   ??? // TODO Check
>>
>> } else {
>>
>>   input.iterator.toSeq.head
>>
>> }
>>
>> result
>>
>>   }
>>
>>
>> }
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
> Cheers,
> Konrad 'ktoso' Malawski
> Akka <http://akka.io/> @ Typesafe <http://typesafe.com/>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Persistence with Avro

2016-12-06 Thread Richard Rodseth
Anyone else using Avro and avro4s to serialize persistence events? I was
able to get the SerializerWithStringManifest below working (with generic
serialize and deserialize methods). Should I be extending something other
than SerializerWithStringManifest, given that Avro on its own helps with
schema evolution?

Note: avro4s seems very promising, although I did run into this limitation
https://github.com/sksamuel/avro4s/issues/75

class PersistenceEventsAvroSerializer extends SerializerWithStringManifest {

  val FooManifest = "Foo"

  val BarManifest = "Bar"

  def identifier = 1234567

 override def manifest(obj: AnyRef): String =

obj match {

  case _: Bar => BarManifest

  case _: Foo => FooManifest

}

 override def toBinary(obj: AnyRef): Array[Byte] = {

   obj match {

  case foo: Foo => serialize[Foo](foo)

  case bsr: Bar => serialize[Bar](bar)

}

  }

 override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {

manifest match {

  case FooManifest => deserialize[Foo](bytes)

  case BarManifest => deserialize[Bar](bytes)

}

  }

  private def serialize[T](data: T)(implicit s: SchemaFor[T], r:
ToRecord[T]): Array[Byte] = {

val baos = new ByteArrayOutputStream

val output = AvroOutputStream.binary[T](baos)

output.write(data)

output.close()

val result = baos.toByteArray()

result

  }


  private def deserialize[T](data: Array[Byte])(implicit s: SchemaFor[T],
r: FromRecord[T]): T = {


val input = AvroInputStream.binary[T](data)

val result: T = if (input.iterator.isEmpty) {

  ??? // TODO Check

} else {

  input.iterator.toSeq.head

}

result

  }


}

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Unit test for Persistent Actor Replay

2016-12-02 Thread Richard Rodseth
Thank you! I was not aware of awaitAssert. I think the following should do
the trick.

watch(definitionReader)

system.stop(definitionReader)

expectTerminated(definitionReader)

var definitionReader2: ActorRef = null

awaitAssert {

  definitionReader2 = system.actorOf(actorProps, actorName)

  definitionReader2 should not be (null)

}

definitionReader2 ! DefinitionReader.GetState

On Fri, Dec 2, 2016 at 11:25 AM, Patrik Nordwall <patrik.nordw...@gmail.com>
wrote:

> For testing purposes you might need to optionally override that way of
> defining the persistenceId by passing it in as constructor parameter.
>
> Another way is to retry the actorOf in the test until it is successful.
> awaitAssert in testkit can be useful for that.
>
> fre 2 dec. 2016 kl. 20:16 skrev Richard Rodseth <rrods...@gmail.com>:
>
>> I see now that
>>
>> https://github.com/akka/akka/blob/release-2.4/akka-
>> persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala
>>
>> works by generating a unique id in beforeEach().
>>
>> But I wonder how all your customers test recovery of their persistent
>> sharded entities when they follow your example of using self.path.name
>> in the persistenceId ?
>>
>> http://doc.akka.io/docs/akka/2.4/scala/cluster-sharding.html#An_Example
>>
>>   override val persistenceId: String = "XYZ-" + self.path.name
>>
>>
>> On Fri, Dec 2, 2016 at 10:59 AM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>> I wondered about that, but the persistence id is derived from the name
>> (it's an aggregate root)
>>
>>   override val persistenceId: String = "XYZ-" + self.path.name
>>
>> On Fri, Dec 2, 2016 at 10:53 AM, Patrik Nordwall <
>> patrik.nordw...@gmail.com> wrote:
>>
>> You can start the second actor with a different actor name but with the
>> same persistenceId.
>>
>> If you don't specify the actor name a unique name will be used.
>>
>> /Patrik
>>
>> fre 2 dec. 2016 kl. 19:43 skrev Richard Rodseth <rrods...@gmail.com>:
>>
>> I'm also looking here:
>> https://github.com/akka/akka/blob/release-2.4/akka-
>> persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala
>>
>> There's a lot of inherited code in these tests, and it's unclear to me
>> how the issue is avoided here:
>>
>> abstract class PersistentActorSpec(config: Config) extends
>> PersistenceSpec(config) with ImplicitSender {
>> import PersistentActorSpec._
>> override protected def beforeEach() {
>> super.beforeEach()
>> val persistentActor = namedPersistentActor[Behavior1PersistentActor]
>> persistentActor ! Cmd("a")
>> persistentActor ! GetState
>> expectMsg(List("a-1", "a-2"))
>> }
>> "A persistent actor" must {
>> "recover from persisted events" in {
>> val persistentActor = namedPersistentActor[Behavior1PersistentActor]
>> persistentActor ! GetState
>> expectMsg(List("a-1", "a-2"))
>> }
>>
>>
>>
>> On Fri, Dec 2, 2016 at 7:42 AM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>> I see that TestActorRef is not an option with PersistentActor
>>
>> http://doc.akka.io/docs/akka/2.4.1/scala/testing.html#
>> Synchronous_Unit_Testing_with_TestActorRef
>>
>> This is not the clearest paragraph, but perhaps provides an option:
>>
>> "It may also be required during testing, when the test subject depends
>> on being instantiated at a specific path. In that case it is best to mock
>> its supervisor so that it will forward the Terminated message to the
>> appropriate point in the test procedure, enabling the latter to await
>> proper deregistration of the name."
>> http://doc.akka.io/docs/akka/current/general/addressing.
>> html#Reusing_Actor_Paths
>>
>> Only other thing I can think of is to use akka-persistence-query APIs to
>> check the event store, but that's not exactly testing recover.
>>
>> Thanks for any other ideas.
>>
>>
>>
>> On Thu, Dec 1, 2016 at 11:24 PM, Roland Kuhn <goo...@rkuhn.info> wrote:
>>
>> Well, it is solved in Akka Typed: removing system.actorOf() and
>> system.stop() is the only way to "solve" this. The reason is that the
>> semantics of these methods are incompatible with the distributed nature of
>> the ActorSystem.
>>
>> Regards, Roland
>>
>> Sent from my iPhone
>>
>> On 2 Dec 2016, at 08:12, Viktor Klang <viktor.kl...@gmail.com> wrote

Re: [akka-user] Unit test for Persistent Actor Replay

2016-12-02 Thread Richard Rodseth
I see now that

https://github.com/akka/akka/blob/release-2.4/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala

works by generating a unique id in beforeEach().

But I wonder how all your customers test recovery of their persistent
sharded entities when they follow your example of using self.path.name in
the persistenceId ?

http://doc.akka.io/docs/akka/2.4/scala/cluster-sharding.html#An_Example

  override val persistenceId: String = "XYZ-" + self.path.name


On Fri, Dec 2, 2016 at 10:59 AM, Richard Rodseth <rrods...@gmail.com> wrote:

> I wondered about that, but the persistence id is derived from the name
> (it's an aggregate root)
>
>   override val persistenceId: String = "XYZ-" + self.path.name
>
> On Fri, Dec 2, 2016 at 10:53 AM, Patrik Nordwall <
> patrik.nordw...@gmail.com> wrote:
>
>> You can start the second actor with a different actor name but with the
>> same persistenceId.
>>
>> If you don't specify the actor name a unique name will be used.
>>
>> /Patrik
>>
>> fre 2 dec. 2016 kl. 19:43 skrev Richard Rodseth <rrods...@gmail.com>:
>>
>>> I'm also looking here:
>>> https://github.com/akka/akka/blob/release-2.4/akka-persisten
>>> ce/src/test/scala/akka/persistence/PersistentActorSpec.scala
>>>
>>> There's a lot of inherited code in these tests, and it's unclear to me
>>> how the issue is avoided here:
>>>
>>> abstract class PersistentActorSpec(config: Config) extends
>>> PersistenceSpec(config) with ImplicitSender {
>>> import PersistentActorSpec._
>>> override protected def beforeEach() {
>>> super.beforeEach()
>>> val persistentActor = namedPersistentActor[Behavior1PersistentActor]
>>> persistentActor ! Cmd("a")
>>> persistentActor ! GetState
>>> expectMsg(List("a-1", "a-2"))
>>> }
>>> "A persistent actor" must {
>>> "recover from persisted events" in {
>>> val persistentActor = namedPersistentActor[Behavior1PersistentActor]
>>> persistentActor ! GetState
>>> expectMsg(List("a-1", "a-2"))
>>> }
>>>
>>>
>>>
>>> On Fri, Dec 2, 2016 at 7:42 AM, Richard Rodseth <rrods...@gmail.com>
>>> wrote:
>>>
>>> I see that TestActorRef is not an option with PersistentActor
>>>
>>> http://doc.akka.io/docs/akka/2.4.1/scala/testing.html#Synchr
>>> onous_Unit_Testing_with_TestActorRef
>>>
>>> This is not the clearest paragraph, but perhaps provides an option:
>>>
>>> "It may also be required during testing, when the test subject depends
>>> on being instantiated at a specific path. In that case it is best to mock
>>> its supervisor so that it will forward the Terminated message to the
>>> appropriate point in the test procedure, enabling the latter to await
>>> proper deregistration of the name."
>>> http://doc.akka.io/docs/akka/current/general/addressing.html
>>> #Reusing_Actor_Paths
>>>
>>> Only other thing I can think of is to use akka-persistence-query APIs to
>>> check the event store, but that's not exactly testing recover.
>>>
>>> Thanks for any other ideas.
>>>
>>>
>>>
>>> On Thu, Dec 1, 2016 at 11:24 PM, Roland Kuhn <goo...@rkuhn.info> wrote:
>>>
>>> Well, it is solved in Akka Typed: removing system.actorOf() and
>>> system.stop() is the only way to "solve" this. The reason is that the
>>> semantics of these methods are incompatible with the distributed nature of
>>> the ActorSystem.
>>>
>>> Regards, Roland
>>>
>>> Sent from my iPhone
>>>
>>> On 2 Dec 2016, at 08:12, Viktor Klang <viktor.kl...@gmail.com> wrote:
>>>
>>> Good question!
>>>
>>> The notification of its death hadn't reached the closest of kin (its
>>> parent) so they hadn't even put it in the ground when you proposed to
>>> create a new one and call it the same thing, so they (rightfully so)
>>> objected to that suggestion.
>>>
>>> This trips people up every now and then, but I've never seen a proposal
>>> to solve it.
>>>
>>> --
>>> Cheers,
>>> √
>>>
>>> On Dec 2, 2016 02:44, "Richard Rodseth" <rrods...@gmail.com> wrote:
>>>
>>> We have a unit test for a persistent actor, which instantiates a second
>>> instance after waiting for termination o

Re: [akka-user] Unit test for Persistent Actor Replay

2016-12-02 Thread Richard Rodseth
I wondered about that, but the persistence id is derived from the name
(it's an aggregate root)

  override val persistenceId: String = "XYZ-" + self.path.name

On Fri, Dec 2, 2016 at 10:53 AM, Patrik Nordwall <patrik.nordw...@gmail.com>
wrote:

> You can start the second actor with a different actor name but with the
> same persistenceId.
>
> If you don't specify the actor name a unique name will be used.
>
> /Patrik
>
> fre 2 dec. 2016 kl. 19:43 skrev Richard Rodseth <rrods...@gmail.com>:
>
>> I'm also looking here:
>> https://github.com/akka/akka/blob/release-2.4/akka-
>> persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala
>>
>> There's a lot of inherited code in these tests, and it's unclear to me
>> how the issue is avoided here:
>>
>> abstract class PersistentActorSpec(config: Config) extends
>> PersistenceSpec(config) with ImplicitSender {
>> import PersistentActorSpec._
>> override protected def beforeEach() {
>> super.beforeEach()
>> val persistentActor = namedPersistentActor[Behavior1PersistentActor]
>> persistentActor ! Cmd("a")
>> persistentActor ! GetState
>> expectMsg(List("a-1", "a-2"))
>> }
>> "A persistent actor" must {
>> "recover from persisted events" in {
>> val persistentActor = namedPersistentActor[Behavior1PersistentActor]
>> persistentActor ! GetState
>> expectMsg(List("a-1", "a-2"))
>> }
>>
>>
>>
>> On Fri, Dec 2, 2016 at 7:42 AM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>> I see that TestActorRef is not an option with PersistentActor
>>
>> http://doc.akka.io/docs/akka/2.4.1/scala/testing.html#
>> Synchronous_Unit_Testing_with_TestActorRef
>>
>> This is not the clearest paragraph, but perhaps provides an option:
>>
>> "It may also be required during testing, when the test subject depends
>> on being instantiated at a specific path. In that case it is best to mock
>> its supervisor so that it will forward the Terminated message to the
>> appropriate point in the test procedure, enabling the latter to await
>> proper deregistration of the name."
>> http://doc.akka.io/docs/akka/current/general/addressing.
>> html#Reusing_Actor_Paths
>>
>> Only other thing I can think of is to use akka-persistence-query APIs to
>> check the event store, but that's not exactly testing recover.
>>
>> Thanks for any other ideas.
>>
>>
>>
>> On Thu, Dec 1, 2016 at 11:24 PM, Roland Kuhn <goo...@rkuhn.info> wrote:
>>
>> Well, it is solved in Akka Typed: removing system.actorOf() and
>> system.stop() is the only way to "solve" this. The reason is that the
>> semantics of these methods are incompatible with the distributed nature of
>> the ActorSystem.
>>
>> Regards, Roland
>>
>> Sent from my iPhone
>>
>> On 2 Dec 2016, at 08:12, Viktor Klang <viktor.kl...@gmail.com> wrote:
>>
>> Good question!
>>
>> The notification of its death hadn't reached the closest of kin (its
>> parent) so they hadn't even put it in the ground when you proposed to
>> create a new one and call it the same thing, so they (rightfully so)
>> objected to that suggestion.
>>
>> This trips people up every now and then, but I've never seen a proposal
>> to solve it.
>>
>> --
>> Cheers,
>> √
>>
>> On Dec 2, 2016 02:44, "Richard Rodseth" <rrods...@gmail.com> wrote:
>>
>> We have a unit test for a persistent actor, which instantiates a second
>> instance after waiting for termination of the first.
>>
>> watch(definitionReader)
>>
>> system.stop(definitionReader)
>>
>> expectTerminated(definitionReader)
>>
>> val notificationDefinitionReader2 = system.actorOf(actorProps, actorName)
>>
>> This test fails intermittently on Bamboo
>>
>> actor name [name1] is not unique!
>>
>> Any ideas?
>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>> current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-u

Re: [akka-user] Unit test for Persistent Actor Replay

2016-12-02 Thread Richard Rodseth
I'm also looking here:
https://github.com/akka/akka/blob/release-2.4/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala

There's a lot of inherited code in these tests, and it's unclear to me how
the issue is avoided here:

abstract class PersistentActorSpec(config: Config) extends
PersistenceSpec(config) with ImplicitSender {
import PersistentActorSpec._
override protected def beforeEach() {
super.beforeEach()
val persistentActor = namedPersistentActor[Behavior1PersistentActor]
persistentActor ! Cmd("a")
persistentActor ! GetState
expectMsg(List("a-1", "a-2"))
}
"A persistent actor" must {
"recover from persisted events" in {
val persistentActor = namedPersistentActor[Behavior1PersistentActor]
persistentActor ! GetState
expectMsg(List("a-1", "a-2"))
}



On Fri, Dec 2, 2016 at 7:42 AM, Richard Rodseth <rrods...@gmail.com> wrote:

> I see that TestActorRef is not an option with PersistentActor
>
> http://doc.akka.io/docs/akka/2.4.1/scala/testing.html#
> Synchronous_Unit_Testing_with_TestActorRef
>
> This is not the clearest paragraph, but perhaps provides an option:
>
> "It may also be required during testing, when the test subject depends on
> being instantiated at a specific path. In that case it is best to mock its
> supervisor so that it will forward the Terminated message to the
> appropriate point in the test procedure, enabling the latter to await
> proper deregistration of the name."
> http://doc.akka.io/docs/akka/current/general/addressing.
> html#Reusing_Actor_Paths
>
> Only other thing I can think of is to use akka-persistence-query APIs to
> check the event store, but that's not exactly testing recover.
>
> Thanks for any other ideas.
>
>
>
> On Thu, Dec 1, 2016 at 11:24 PM, Roland Kuhn <goo...@rkuhn.info> wrote:
>
>> Well, it is solved in Akka Typed: removing system.actorOf() and
>> system.stop() is the only way to "solve" this. The reason is that the
>> semantics of these methods are incompatible with the distributed nature of
>> the ActorSystem.
>>
>> Regards, Roland
>>
>> Sent from my iPhone
>>
>> On 2 Dec 2016, at 08:12, Viktor Klang <viktor.kl...@gmail.com> wrote:
>>
>> Good question!
>>
>> The notification of its death hadn't reached the closest of kin (its
>> parent) so they hadn't even put it in the ground when you proposed to
>> create a new one and call it the same thing, so they (rightfully so)
>> objected to that suggestion.
>>
>> This trips people up every now and then, but I've never seen a proposal
>> to solve it.
>>
>> --
>> Cheers,
>> √
>>
>> On Dec 2, 2016 02:44, "Richard Rodseth" <rrods...@gmail.com> wrote:
>>
>>> We have a unit test for a persistent actor, which instantiates a second
>>> instance after waiting for termination of the first.
>>>
>>> watch(definitionReader)
>>>
>>> system.stop(definitionReader)
>>>
>>> expectTerminated(definitionReader)
>>>
>>> val notificationDefinitionReader2 = system.actorOf(actorProps, actorName
>>> )
>>>
>>> This test fails intermittently on Bamboo
>>>
>>> actor name [name1] is not unique!
>>>
>>> Any ideas?
>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>>

Re: [akka-user] Unit test for Persistent Actor Replay

2016-12-02 Thread Richard Rodseth
I see that TestActorRef is not an option with PersistentActor

http://doc.akka.io/docs/akka/2.4.1/scala/testing.html#Synchronous_Unit_Testing_with_TestActorRef

This is not the clearest paragraph, but perhaps provides an option:

"It may also be required during testing, when the test subject depends on
being instantiated at a specific path. In that case it is best to mock its
supervisor so that it will forward the Terminated message to the
appropriate point in the test procedure, enabling the latter to await
proper deregistration of the name."
http://doc.akka.io/docs/akka/current/general/addressing.html#Reusing_Actor_Paths

Only other thing I can think of is to use akka-persistence-query APIs to
check the event store, but that's not exactly testing recover.

Thanks for any other ideas.



On Thu, Dec 1, 2016 at 11:24 PM, Roland Kuhn <goo...@rkuhn.info> wrote:

> Well, it is solved in Akka Typed: removing system.actorOf() and
> system.stop() is the only way to "solve" this. The reason is that the
> semantics of these methods are incompatible with the distributed nature of
> the ActorSystem.
>
> Regards, Roland
>
> Sent from my iPhone
>
> On 2 Dec 2016, at 08:12, Viktor Klang <viktor.kl...@gmail.com> wrote:
>
> Good question!
>
> The notification of its death hadn't reached the closest of kin (its
> parent) so they hadn't even put it in the ground when you proposed to
> create a new one and call it the same thing, so they (rightfully so)
> objected to that suggestion.
>
> This trips people up every now and then, but I've never seen a proposal to
> solve it.
>
> --
> Cheers,
> √
>
> On Dec 2, 2016 02:44, "Richard Rodseth" <rrods...@gmail.com> wrote:
>
>> We have a unit test for a persistent actor, which instantiates a second
>> instance after waiting for termination of the first.
>>
>> watch(definitionReader)
>>
>> system.stop(definitionReader)
>>
>> expectTerminated(definitionReader)
>>
>> val notificationDefinitionReader2 = system.actorOf(actorProps, actorName)
>>
>> This test fails intermittently on Bamboo
>>
>> actor name [name1] is not unique!
>>
>> Any ideas?
>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Unit test for Persistent Actor Replay

2016-12-01 Thread Richard Rodseth
We have a unit test for a persistent actor, which instantiates a second
instance after waiting for termination of the first.

watch(definitionReader)

system.stop(definitionReader)

expectTerminated(definitionReader)

val notificationDefinitionReader2 = system.actorOf(actorProps, actorName)

This test fails intermittently on Bamboo

actor name [name1] is not unique!

Any ideas?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Entity streaming

2016-11-21 Thread Richard Rodseth
Good to know! I will have case classes anyway.

On Mon, Nov 21, 2016 at 9:55 AM, Konrad Malawski <
konrad.malaw...@typesafe.com> wrote:

> You have sadly bumped into an weird edge case here, this API was designed
> mostly for streaming case classes in mind,
> as streaming primitives may be a nice hello world but rarely is the thing
> one wants I think...
>
> To workaround the limitation you can provide an explicit marshaller that
> exposes the String as application/json value:
>
> implicit val stringFormat = Marshaller[String, ByteString] { ec ⇒ s ⇒
>   Future.successful {
> List(Marshalling.WithFixedContentType(ContentTypes.`application/json`, () 
> ⇒
>   ByteString("\"" + s + "\"")) // "raw string" to be rendered as json 
> element in our stream must be enclosed by ""
> )
>   }
> }
>
>
> with this it'll work. The issue is that while we do provide marshallers
> for String, they're of type text/plain, which the streaming infrastructure
> looks at and decides "oh, I can't use that, I'm supposed to stream json".
> Eventually it runs out of options and does not marshall anything...
>
> I've made a PR which makes this an explicit error now, and we'll discuss
> in the team how to provide better compile time safety for this.
> Please track the issue 424 for more discussion on this.
>
>
> PS: For case classes, e.g. using spray-json or any json library to
> generate the marshallers (see akka-http-json) it'll work properly.
>
>
> On Mon, Nov 21, 2016 at 3:47 PM, Konrad Malawski <
> konrad.malaw...@typesafe.com> wrote:
>
>> Thanks for reporting Richard - seems there's a bug somewhere, I'm looking
>> into it.
>> Seems it doesn't compose nicely with existing predefined primitive
>> marshallers (like String here), if it was wrapped in a type (say
>> Thing("One") then it works, which is what all our tests were doing).
>>
>> Please track this issue to be informed about progress:
>> https://github.com/akka/akka-http/issues/424
>>
>> --
>> Konrad
>>
>> On Sat, Nov 19, 2016 at 1:55 AM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>>> Trying this out for the first time. I get a 200 OK but empty body.
>>>
>>> val eventsRoute = path("events") {
>>>
>>>   get {
>>>
>>> //val results: List[String] = List("One", "Two", "Three")
>>>
>>> val results: Source[String, NotUsed] = Source(List("One", "Two",
>>> "Three"))
>>>
>>> complete(results)
>>>
>>>   }
>>>
>>> }
>>>
>>> The actor that defines the Route mixes in a trait with:
>>>
>>>   val start = ByteString.empty
>>>
>>>   val sep = ByteString("\n")
>>>
>>>   val end = ByteString.empty
>>>
>>>   implicit val jsonStreamingSupport = EntityStreamingSupport.json()
>>>
>>> .withFramingRenderer(Flow[ByteString].intersperse(start, sep, end))
>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Cheers,
>> Konrad 'ktoso' Malawski
>> Akka <http://akka.io/> @ Typesafe <http://typesafe.com/>
>>
>
>
>
> --
> Cheers,
> Konrad 'ktoso' Malawski
> Akka <http://akka.io/> @ Typesafe <http://typesafe.com/>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Entity streaming

2016-11-18 Thread Richard Rodseth
Trying this out for the first time. I get a 200 OK but empty body.

val eventsRoute = path("events") {

  get {

//val results: List[String] = List("One", "Two", "Three")

val results: Source[String, NotUsed] = Source(List("One", "Two",
"Three"))

complete(results)

  }

}

The actor that defines the Route mixes in a trait with:

  val start = ByteString.empty

  val sep = ByteString("\n")

  val end = ByteString.empty

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()

.withFramingRenderer(Flow[ByteString].intersperse(start, sep, end))

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Resumable Projection

2016-11-18 Thread Richard Rodseth
Actually, I was referring to this

http://doc.akka.io/docs/akka/2.4.12/scala/persistence-query.html#Resumable_projections

We are using Kafka upstream from this. A (persistent) actor A consumes the
Kafka stream. Another persistent actor B uses eventsByTag(A) and runs a
stream with several stages. I'm taking the approach of including the offset
(the eventStore offset, not the Kafka one) in this second stream so I can
update it in B it at the end of the flow.

Thanks

On Fri, Nov 18, 2016 at 4:37 AM, Akka Team <akka.offic...@gmail.com> wrote:

> Hi Richard,
>
> This is not handled by streams itself as it is a feature that Sources must
> support. I think the most common solution people use for this is Kafka.
> There is a Streams connector for Kafka: https://github.com/akka/
> reactive-kafka
>
> -Endre
>
> On Tue, Nov 15, 2016 at 3:04 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Any good examples out there of resumable projections driving non-trivial
>> streams?
>>
>> I'm guessing I will have to keep the stream 1-1 and pass the offset all
>> the way downstream so I can save it at the end?
>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
> Akka Team
> Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM
> Twitter: @akkateam
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Timing of eventsByTag() and persist()

2016-11-15 Thread Richard Rodseth
We are seeing some baffling behaviour in a unit test (using
akka-persistence-inmemory).

The test starts actor A and actor B (which starts a stream using
eventsByTag in its
RecoveryCompleted handler).
The test then sends commands to actor A, which results in appropriately
tagged events being persisted.

Without going into all details, the assertion is based on a receiveN on a
probe.

What we see is that some elements at the beginning of the stream get missed
and the assertion fails.

We can make it pass by sending a message to actor A immediately after
starting it, or by putting a Thread.sleep at that point.

To add to the mystery, the test passes when run on its own, but fails when
run with others. That made us suspect issues with
akka-persistence-inmemory, but we do take steps to clean up the event
store, and logging of the stream elements persuaded us that was not the
issue.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Resumable Projection

2016-11-14 Thread Richard Rodseth
Any good examples out there of resumable projections driving non-trivial
streams?

I'm guessing I will have to keep the stream 1-1 and pass the offset all the
way downstream so I can save it at the end?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] StreamSupervisor name

2016-11-11 Thread Richard Rodseth
Thanks.
I actually found the bug that was causing my stream to get stuck. I was
running a Source.queue in an actor, and calling context.become (without
stashing) immediately afterwards.
I found that one of the stream elements was reaching the initial receive
handler, and thus not getting acked.





On Fri, Nov 11, 2016 at 3:48 AM, Akka Team <akka.offic...@gmail.com> wrote:

> AbruptTerminationException happens when the actor running the stream (the
> "graph interpreter") is terminated without the stream first completing or
> failing, for example if terminating an actor system with a running stream
> in it.
>
> The paths of the supervisors are pretty much an implementation detail, but
> perhaps you can provide some addition to the string "end-of-lookup" to give
> you context in the log entry?
>
> --
> Johan
> Akka Team
>
> On Wed, Nov 9, 2016 at 6:33 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> I'm trying to write two ScalaTest tests to compare two approaches to
>> driving a Flow that sends out emails. In one approach the flow is sourced
>> using eventsByTag from akka-persistence-query. In the other, I uses
>> SourceQueue.offer after receiving a message via AtLeastOnceDelivery. The
>> test ends with a check of mock JavaMail mailbox size.
>>
>> eventually {
>>
>>   mailBoxSize("ema...@foo.com") should be(expectedEmails)
>>
>> }
>>
>> eventually {
>>
>>   mailBoxSize("ema...@foo.com") should be(expectedEmails)
>>
>> }
>>
>> The test that uses eventsByTag to drive the flow succeeds with 1
>> events. The one that uses SourceQueue.offer gets stuck at 11. In the flow
>> below, the println appears 12 times but the log("end-of-lookup") only
>> appears 11 times.
>>
>>   def flowLookupEmails()(implicit lookup: EmailsByDefinition, ec:
>> ExecutionContext): Flow[AlarmOccurrence, AlarmOccurrenceWithRecipients,
>> NotUsed] = {
>> val parallelism = 5
>> val result = Flow[AlarmOccurrence].mapAsync(parallelism) {
>> occurrence =>
>>   val emailsF: Future[List[String]] = lookup.recipientsForDefinition
>> (occurrence.definitionId)
>>   val lookupResult: Future[AlarmOccurrenceWithRecipients] =
>> emailsF.map { emails =>
>> println(s"# Found emails $emails")
>> AlarmOccurrenceWithRecipients(occurrence, emails)
>>   }
>>
>>   lookupResult
>> }
>> .log("end-of-lookup").withAttributes(Attributes.logLevels(onElement
>> = Logging.InfoLevel))
>> .named("lookup-emails")
>> result
>>   }
>>
>> When I turn on DEBUG logging, I get a lot of noise, but do see
>>
>> [DEBUG] [11/09/2016 08:58:02.827] [OccurrencesIntegrationSpec-ak
>> ka.actor.default-dispatcher-12] [akka://OccurrencesIntegration
>> Spec/user/StreamSupervisor-3/flow-25-0-unknown-operation] stopped
>>
>> [ERROR] [11/09/2016 08:59:02.455] [OccurrencesIntegrationSpec-ak
>> ka.actor.default-dispatcher-21] [akka.stream.Log(akka://Occurr
>> encesIntegrationSpec/user/StreamSupervisor-0)] [end-of-lookup] Upstream
>> failed. (akka.stream.AbruptTerminationException)
>> Any ideas? What are some good techniques to find the cause of the
>> AbruptTerminationException, or name the StreamSupervisors so I know which
>> stream they are associated with?
>>
>> Thanks.
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you 

[akka-user] StreamSupervisor name

2016-11-09 Thread Richard Rodseth
I'm trying to write two ScalaTest tests to compare two approaches to
driving a Flow that sends out emails. In one approach the flow is sourced
using eventsByTag from akka-persistence-query. In the other, I uses
SourceQueue.offer after receiving a message via AtLeastOnceDelivery. The
test ends with a check of mock JavaMail mailbox size.

eventually {

  mailBoxSize("ema...@foo.com") should be(expectedEmails)

}

eventually {

  mailBoxSize("ema...@foo.com") should be(expectedEmails)

}

The test that uses eventsByTag to drive the flow succeeds with 1
events. The one that uses SourceQueue.offer gets stuck at 11. In the flow
below, the println appears 12 times but the log("end-of-lookup") only
appears 11 times.

  def flowLookupEmails()(implicit lookup: EmailsByDefinition, ec:
ExecutionContext): Flow[AlarmOccurrence, AlarmOccurrenceWithRecipients,
NotUsed] = {
val parallelism = 5
val result = Flow[AlarmOccurrence].mapAsync(parallelism) { occurrence =>
  val emailsF: Future[List[String]] =
lookup.recipientsForDefinition(occurrence.definitionId)
  val lookupResult: Future[AlarmOccurrenceWithRecipients] = emailsF.map
{ emails =>
println(s"# Found emails $emails")
AlarmOccurrenceWithRecipients(occurrence, emails)
  }

  lookupResult
}
.log("end-of-lookup").withAttributes(Attributes.logLevels(onElement =
Logging.InfoLevel))
.named("lookup-emails")
result
  }

When I turn on DEBUG logging, I get a lot of noise, but do see

[DEBUG] [11/09/2016 08:58:02.827]
[OccurrencesIntegrationSpec-akka.actor.default-dispatcher-12]
[akka://OccurrencesIntegrationSpec/user/StreamSupervisor-3/flow-25-0-unknown-operation]
stopped

[ERROR] [11/09/2016 08:59:02.455]
[OccurrencesIntegrationSpec-akka.actor.default-dispatcher-21]
[akka.stream.Log(akka://OccurrencesIntegrationSpec/user/StreamSupervisor-0)]
[end-of-lookup] Upstream failed. (akka.stream.AbruptTerminationException)
Any ideas? What are some good techniques to find the cause of the
AbruptTerminationException, or name the StreamSupervisors so I know which
stream they are associated with?

Thanks.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Persistent actor as durable queue

2016-11-01 Thread Richard Rodseth
I was originally imagining a long flow from a akka-streams-kafka
commitableSource, but due to the issue I raised earlier about needing
CommitableMessages throughout the flow, I am considering splitting the
stream in StreamA and StreamB, where StreamA would capture the consumed
messages in a persistent actor. This means I need a way to forward the
received messages to StreamB.

I get the impression that Sink.actorRefWithAck is favored over
ActorSubscriber, and Source.queue is favored over ActorPublisher.

Should Actor B (which runs StreamB) just use eventsByTag to get its Source?
Or would performance be better if the persistent actor implemented
AtLeastOnceDelivery to send the message to ActorB which would offer it to a
SourceQueue it held internally?

Thanks.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Abstracting away CommitableMessage

2016-10-26 Thread Richard Rodseth
I came up with this before realizing my Flow (at least currently) is not
1:1 :(

  def kafka1to1Flow[In,Out](job: Flow[In, Out, _]):
Graph[FlowShape[CommittableMessage[_,In], (Out, CommittableMessage[_,In])],
NotUsed] = {

val result = GraphDSL.create() { implicit b =>

  import GraphDSL.Implicits._

  val toValue: FlowShape[CommittableMessage[_,In], In] =
b.add(Flow[CommittableMessage[_,In]].map(_.record.value))

  val bcast: UniformFanOutShape[CommittableMessage[_,In],
CommittableMessage[_,In]] = b.add(Broadcast[CommittableMessage[_,In]](2))

  val zip: FanInShape2[Out, CommittableMessage[_,In], (Out,
CommittableMessage[_,In])] = b.add(Zip[Out, CommittableMessage[_,In]]())

  bcast.out(0) ~> toValue ~> job ~> zip.in0

  bcast.out(1) ~> zip.in1

  FlowShape(bcast.in, zip.out)

}

result

  }

On Wed, Oct 26, 2016 at 2:08 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> Yes, thanks. I'll explore this.
>
> On Wed, Oct 26, 2016 at 2:04 PM, Roland Kuhn <goo...@rkuhn.info> wrote:
>
>> Yes, indeed: if it is strictly 1:1 and it retains the order of the
>> messages, then this works. Thanks for the sample!
>>
>> Regards,
>>
>> Roland
>>
>> > 26 okt. 2016 kl. 22:12 skrev Itamar Ravid <ira...@iravid.com>:
>> >
>> > Broadcast/Zip if your existing flow is 1:1. Here's a sample:
>> https://github.com/iravid/stream-processing-talk/blob/master
>> /GraphDSL.scala#L86
>> >
>> > --
>> >>>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>>> Search the archives: https://groups.google.com/grou
>> p/akka-user
>> > ---
>> > You received this message because you are subscribed to the Google
>> Groups "Akka User List" group.
>> > To unsubscribe from this group and stop receiving emails from it, send
>> an email to akka-user+unsubscr...@googlegroups.com.
>> > To post to this group, send email to akka-user@googlegroups.com.
>> > Visit this group at https://groups.google.com/group/akka-user.
>> > For more options, visit https://groups.google.com/d/optout.
>>
>> --
>> >>>>>>>>>>  Read the docs: http://akka.io/docs/
>> >>>>>>>>>>  Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>>  Search the archives: https://groups.google.com/grou
>> p/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Abstracting away CommitableMessage

2016-10-26 Thread Richard Rodseth
Yes, thanks. I'll explore this.

On Wed, Oct 26, 2016 at 2:04 PM, Roland Kuhn  wrote:

> Yes, indeed: if it is strictly 1:1 and it retains the order of the
> messages, then this works. Thanks for the sample!
>
> Regards,
>
> Roland
>
> > 26 okt. 2016 kl. 22:12 skrev Itamar Ravid :
> >
> > Broadcast/Zip if your existing flow is 1:1. Here's a sample:
> https://github.com/iravid/stream-processing-talk/blob/
> master/GraphDSL.scala#L86
> >
> > --
> >>> Read the docs: http://akka.io/docs/
> >>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>> Search the archives: https://groups.google.com/
> group/akka-user
> > ---
> > You received this message because you are subscribed to the Google
> Groups "Akka User List" group.
> > To unsubscribe from this group and stop receiving emails from it, send
> an email to akka-user+unsubscr...@googlegroups.com.
> > To post to this group, send email to akka-user@googlegroups.com.
> > Visit this group at https://groups.google.com/group/akka-user.
> > For more options, visit https://groups.google.com/d/optout.
>
> --
> >>  Read the docs: http://akka.io/docs/
> >>  Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>  Search the archives: https://groups.google.com/
> group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Abstracting away CommitableMessage

2016-10-26 Thread Richard Rodseth
Not sure I followed all that, but thanks.
I don't need a general solution.
I know that my flow is 1 to 1 , say a Flow[SomeEvent, SomeEventRecorded]

Using typeclasses, I would have Flow[SomeEventLike, SomeEventRecordedLike]
and the actual messages would be CommitableMessages, but all the stages
involved in defining the flow would use XYZLike, with implicit evidence
parameters.

Was really hoping I don't have to do that and could somehow nest my
Flow[SomeEvent, SomeEventRecorded], extracting SomeEvent from a commitable
message and putting SomeEventRecorded back into one.



On Wed, Oct 26, 2016 at 10:59 AM, Roland Kuhn <goo...@rkuhn.info> wrote:

> Hi Richard,
>
> unfortunately this is not possible in general: you would need to define
> exactly how the extra data (i.e. the commit token) is transported together
> with the data it belongs to through all possible stages. The answer from
> category theory would be a functor, but how do you define one for
> conflate/expand/fold/etc.? Another approach could be to limit the nested
> flow to only one element at a time so that the metadata stay valid in the
> surrounding context, but there you run into the same problem: what if you
> have stateful stages or how do you determine that the element has been
> dropped? (timeouts are never a real solution, only an approximate
> safe-guard or band-aid)
>
> Notice that none of this is specific to Akka Streams, this is a problem
> for all stream processing that has stages that do not emit one value for
> each input.
>
> Having said all that, it is possible to implement what you want for a
> subset of operations where it makes sense, and in principle type classes or
> extension methods are a suitable vehicle. Unfortunately you’ll have to
> implement the type machinery specifically for all four FlowOps subtypes
> (Source, Flow, SubSource, SubFlow). I have not yet tried out whether the
> partial fix for SI-2712 that will be in Scala 2.12.0 and 2.11.9 is
> sufficient to make this generic—I won’t say “easy” in any case, because the
> types you’ll encounter will certainly be daunting.
>
> I’d love to sink my teeth into this problem, but unfortunately I don’t
> have time for that right now :-(
>
> Regards,
>
> Roland
>
> 26 okt. 2016 kl. 18:49 skrev Richard Rodseth <rrods...@gmail.com>:
>
> Poorly chosen title.
> By abstract away, I meant I want to reuse an existing Flow rather than
> having to rewrite it to use CommitableMessages all the way through.
> So I think I may need something like a nested flow (the "job") in a stage
> that hangs onto the current CommitableMessage.
>
>
> On Wed, Oct 26, 2016 at 9:23 AM, Viktor Klang <viktor.kl...@gmail.com>
> wrote:
>
>> what would happen if that stage would silently discard the
>> CommittableMessage?
>>
>> --
>> Cheers,
>> √
>>
>> On Oct 26, 2016 6:09 PM, "Richard Rodseth" <rrods...@gmail.com> wrote:
>>
>>> I'm planning to use a commitableSource from akka-streams-kafka.
>>>
>>> Is there a way to re-use an existing Flow that knows nothing about
>>> Kafka, extracting the record value and reconstituting the CommitableMessage
>>> at the end of the flow?
>>>
>>> In the past I've experimented with using TypeClasses in my flow
>>> definition signatures, but it got a bit messy.
>>>
>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" gro

Re: [akka-user] Abstracting away CommitableMessage

2016-10-26 Thread Richard Rodseth
Poorly chosen title.
By abstract away, I meant I want to reuse an existing Flow rather than
having to rewrite it to use CommitableMessages all the way through.
So I think I may need something like a nested flow (the "job") in a stage
that hangs onto the current CommitableMessage.


On Wed, Oct 26, 2016 at 9:23 AM, Viktor Klang <viktor.kl...@gmail.com>
wrote:

> what would happen if that stage would silently discard the
> CommittableMessage?
>
> --
> Cheers,
> √
>
> On Oct 26, 2016 6:09 PM, "Richard Rodseth" <rrods...@gmail.com> wrote:
>
>> I'm planning to use a commitableSource from akka-streams-kafka.
>>
>> Is there a way to re-use an existing Flow that knows nothing about Kafka,
>> extracting the record value and reconstituting the CommitableMessage at the
>> end of the flow?
>>
>> In the past I've experimented with using TypeClasses in my flow
>> definition signatures, but it got a bit messy.
>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Abstracting away CommitableMessage

2016-10-26 Thread Richard Rodseth
I'm planning to use a commitableSource from akka-streams-kafka.

Is there a way to re-use an existing Flow that knows nothing about Kafka,
extracting the record value and reconstituting the CommitableMessage at the
end of the flow?

In the past I've experimented with using TypeClasses in my flow definition
signatures, but it got a bit messy.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Combining Kafka, Akka Streams and (Persistent) Actors

2016-10-26 Thread Richard Rodseth
Thanks!

> On Oct 26, 2016, at 2:35 AM, Patrik Nordwall <patrik.nordw...@gmail.com> 
> wrote:
> 
> ask is fine as long as you don't close over something that is not thread-safe 
> in callbacks of the Future. It should be low risk of mistake in mapAsync 
> since you typically only return the Future (after casting/mapTo to the right 
> type).
> 
> I updated the Streams documentation for integration with actors the other day:
> 
> http://doc.akka.io/docs/akka/snapshot/scala/stream/stream-integrations.html
> 
> http://doc.akka.io/docs/akka/snapshot/java/stream/stream-integrations.html
> 
> Regards,
> Patrik
> 
> 
>> On Tue, Oct 25, 2016 at 11:26 PM, Richard Rodseth <rrods...@gmail.com> wrote:
>> I should add that the ask() I would be inserting would actually be to the 
>> ShardRegion for a shared, persistent actor.
>> 
>>> On Tue, Oct 25, 2016 at 11:33 AM, Richard Rodseth <rrods...@gmail.com> 
>>> wrote:
>>> Anyone else? Suppose I need a stage that just looks up something that is 
>>> contained in a (persistent) actor.
>>> Is it good practice to use mapAsync with an ask() to that actor?
>>> Or is there some other stage that would let me use the actor to transform 
>>> one stream element to another without sacrificing backpressure.
>>> This SO question advises against making an actor a Processor.
>>> http://stackoverflow.com/questions/31272267/creating-an-actorpublisher-and-actorsubscriber-with-same-actor
>>> 
>>> 
>>>> On Thu, Oct 20, 2016 at 2:55 PM, Richard Rodseth <rrods...@gmail.com> 
>>>> wrote:
>>>> Short version: is it fair to say the traditional warnings against ask() 
>>>> hold less weight because we have back-pressure?
>>>> 
>>>> In the past I've built an Akka app (no ask() pattern except at the outer 
>>>> edge), and a tool that used Akka Streams (no visible actors except a 
>>>> monitor updated with alsoTo), but am now trying to combine the two 
>>>> concepts.
>>>> 
>>>> Imagine a service which consumes a Kafka topic, sends an email (service 
>>>> returns Future) and updates an aggregate (persistent Actor). I can imagine 
>>>> an infinite stream for this, with mapAsync generating back pressure from 
>>>> the email service, and the persistent actor as a Sink. Email retries could 
>>>> be handled at the Future level, though I'm still a little unclear on how 
>>>> error scenarios would be handled.
>>>> 
>>>> But what if the flow needs to thread through other (persistent) actors on 
>>>> the way to the Email service, perhaps to gather some information for the 
>>>> email?
>>>> 
>>>> Would it make sense to use an ask() here (perhaps in combination with 
>>>> per-request actors). 
>>>> 
>>>> Is it fair to say the traditional warnings against ask() hold less weight 
>>>> because we have back-pressure?
>>>> 
>>>> Could the command to update the the aggregate persistent actor also be 
>>>> issued with an ask() and acked, leading to a more functional style overall?
>>>> 
>>>> Advice or examples appreciated. 
>> 
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
> 
> 
> 
> -- 
> Patrik Nordwall
> Akka Tech Lead
> Lightbend -  Reactive apps on the JVM
> Twitter: @patriknw
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscr

[akka-user] Re: Combining Kafka, Akka Streams and (Persistent) Actors

2016-10-25 Thread Richard Rodseth
I should add that the ask() I would be inserting would actually be to the
ShardRegion for a shared, persistent actor.

On Tue, Oct 25, 2016 at 11:33 AM, Richard Rodseth <rrods...@gmail.com>
wrote:

> Anyone else? Suppose I need a stage that just looks up something that is
> contained in a (persistent) actor.
> Is it good practice to use mapAsync with an ask() to that actor?
> Or is there some other stage that would let me use the actor to transform
> one stream element to another without sacrificing backpressure.
> This SO question advises against making an actor a Processor.
> http://stackoverflow.com/questions/31272267/creating-
> an-actorpublisher-and-actorsubscriber-with-same-actor
>
>
> On Thu, Oct 20, 2016 at 2:55 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Short version: is it fair to say the traditional warnings against ask()
>> hold less weight because we have back-pressure?
>>
>> In the past I've built an Akka app (no ask() pattern except at the outer
>> edge), and a tool that used Akka Streams (no visible actors except a
>> monitor updated with alsoTo), but am now trying to combine the two concepts.
>>
>> Imagine a service which consumes a Kafka topic, sends an email (service
>> returns Future) and updates an aggregate (persistent Actor). I can imagine
>> an infinite stream for this, with mapAsync generating back pressure from
>> the email service, and the persistent actor as a Sink. Email retries could
>> be handled at the Future level, though I'm still a little unclear on how
>> error scenarios would be handled.
>>
>> But what if the flow needs to thread through other (persistent) actors on
>> the way to the Email service, perhaps to gather some information for the
>> email?
>>
>> Would it make sense to use an ask() here (perhaps in combination with
>> per-request actors).
>>
>> Is it fair to say the traditional warnings against ask() hold less weight
>> because we have back-pressure?
>>
>> Could the command to update the the aggregate persistent actor also be
>> issued with an ask() and acked, leading to a more functional style overall?
>>
>> Advice or examples appreciated.
>>
>>
>>
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Combining Kafka, Akka Streams and (Persistent) Actors

2016-10-25 Thread Richard Rodseth
Anyone else? Suppose I need a stage that just looks up something that is
contained in a (persistent) actor.
Is it good practice to use mapAsync with an ask() to that actor?
Or is there some other stage that would let me use the actor to transform
one stream element to another without sacrificing backpressure.
This SO question advises against making an actor a Processor.
http://stackoverflow.com/questions/31272267/creating-an-actorpublisher-and-actorsubscriber-with-same-actor


On Thu, Oct 20, 2016 at 2:55 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> Short version: is it fair to say the traditional warnings against ask()
> hold less weight because we have back-pressure?
>
> In the past I've built an Akka app (no ask() pattern except at the outer
> edge), and a tool that used Akka Streams (no visible actors except a
> monitor updated with alsoTo), but am now trying to combine the two concepts.
>
> Imagine a service which consumes a Kafka topic, sends an email (service
> returns Future) and updates an aggregate (persistent Actor). I can imagine
> an infinite stream for this, with mapAsync generating back pressure from
> the email service, and the persistent actor as a Sink. Email retries could
> be handled at the Future level, though I'm still a little unclear on how
> error scenarios would be handled.
>
> But what if the flow needs to thread through other (persistent) actors on
> the way to the Email service, perhaps to gather some information for the
> email?
>
> Would it make sense to use an ask() here (perhaps in combination with
> per-request actors).
>
> Is it fair to say the traditional warnings against ask() hold less weight
> because we have back-pressure?
>
> Could the command to update the the aggregate persistent actor also be
> issued with an ask() and acked, leading to a more functional style overall?
>
> Advice or examples appreciated.
>
>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Combining Kafka, Akka Streams and (Persistent) Actors

2016-10-21 Thread Richard Rodseth
Isn't your comment about "messes up the pseudo-single-threading invariant
of Actors" more about not closing over mutable state? In any case, you
can't avoid Futures if you're using Slick or HTTP clients, for example.

On Fri, Oct 21, 2016 at 7:32 AM, Richard Rodseth <rrods...@gmail.com> wrote:

> I've had no problems using Futures within actors and piping the result to
> the actor, after mapping. I have had problems when an actor gets flooded
> and you get timeouts because you're simply starting up too many Futures.
> That, I think, is where back-pressure changes the landscape.
> Thanks for the Requester link.
>
> On Fri, Oct 21, 2016 at 4:55 AM, Justin du coeur <jduco...@gmail.com>
> wrote:
>
>> On Thu, Oct 20, 2016 at 5:55 PM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>>> Short version: is it fair to say the traditional warnings against ask()
>>> hold less weight because we have back-pressure?
>>>
>>
>> Well, keep in mind that at least some of the usual warnings against ask()
>> have nothing obviously to do with that sort of thing.  The biggest
>> traditional problem with ask() is really a warning about using Future --
>> it's always dangerous to map/flatMap on a Future inside of an Actor,
>> because the Future will generally execute in parallel rather than properly
>> synchronized in receive; as a result, Future messes up the
>> pseudo-single-threading invariant of Actors.  And since a for comprehension
>> of ask() is simply mapping over Futures, it's automatically a bit risky.  I
>> don't see how back-pressure helps with that.
>>
>> I haven't thought about how it interacts with streams, but fixing the
>> ask() problem is the whole point of the Requester library
>> <https://github.com/jducoeur/Requester>. You might want to give that a
>> look, and see if it helps with your needs...
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Combining Kafka, Akka Streams and (Persistent) Actors

2016-10-21 Thread Richard Rodseth
I've had no problems using Futures within actors and piping the result to
the actor, after mapping. I have had problems when an actor gets flooded
and you get timeouts because you're simply starting up too many Futures.
That, I think, is where back-pressure changes the landscape.
Thanks for the Requester link.

On Fri, Oct 21, 2016 at 4:55 AM, Justin du coeur <jduco...@gmail.com> wrote:

> On Thu, Oct 20, 2016 at 5:55 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Short version: is it fair to say the traditional warnings against ask()
>> hold less weight because we have back-pressure?
>>
>
> Well, keep in mind that at least some of the usual warnings against ask()
> have nothing obviously to do with that sort of thing.  The biggest
> traditional problem with ask() is really a warning about using Future --
> it's always dangerous to map/flatMap on a Future inside of an Actor,
> because the Future will generally execute in parallel rather than properly
> synchronized in receive; as a result, Future messes up the
> pseudo-single-threading invariant of Actors.  And since a for comprehension
> of ask() is simply mapping over Futures, it's automatically a bit risky.  I
> don't see how back-pressure helps with that.
>
> I haven't thought about how it interacts with streams, but fixing the
> ask() problem is the whole point of the Requester library
> <https://github.com/jducoeur/Requester>. You might want to give that a
> look, and see if it helps with your needs...
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Combining Kafka, Akka Streams and (Persistent) Actors

2016-10-20 Thread Richard Rodseth
Short version: is it fair to say the traditional warnings against ask()
hold less weight because we have back-pressure?

In the past I've built an Akka app (no ask() pattern except at the outer
edge), and a tool that used Akka Streams (no visible actors except a
monitor updated with alsoTo), but am now trying to combine the two concepts.

Imagine a service which consumes a Kafka topic, sends an email (service
returns Future) and updates an aggregate (persistent Actor). I can imagine
an infinite stream for this, with mapAsync generating back pressure from
the email service, and the persistent actor as a Sink. Email retries could
be handled at the Future level, though I'm still a little unclear on how
error scenarios would be handled.

But what if the flow needs to thread through other (persistent) actors on
the way to the Email service, perhaps to gather some information for the
email?

Would it make sense to use an ask() here (perhaps in combination with
per-request actors).

Is it fair to say the traditional warnings against ask() hold less weight
because we have back-pressure?

Could the command to update the the aggregate persistent actor also be
issued with an ask() and acked, leading to a more functional style overall?

Advice or examples appreciated.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Testing Akka Persistence Recovery

2016-10-13 Thread Richard Rodseth
Thank you. That did the trick.

  watch(subscriber1)
  system.stop(subscriber1)
  expectTerminated(subscriber1)
  val subscriber2 = system.actorOf(props1, "name1") // New actor, same
name
  subscriber2 ! Subscriber.GetState
  val result2 = expectMsg(...something...)

On Thu, Oct 13, 2016 at 12:01 AM, Patrik Nordwall <patrik.nordw...@gmail.com
> wrote:

> The actor's name and persistenceId doesn't have to be the same, but if you
> need that you must wait until it has been terminated before starting the
> new instance
>
> watch(subscriber1)
> expectTerminated(subscriber2)
>
> On Thu, Oct 13, 2016 at 7:10 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Thanks, but if I do this:
>>
>>   system.stop(subscriber1)
>>
>>   val subscriber2 = system.actorOf(props1, "name1")
>>
>> I get an error that "name1" is not unique. The actor's persistence id is:
>>
>>   override val persistenceId: String = "subscriber-" + self.path.name
>>
>> On Wed, Oct 12, 2016 at 9:59 PM, Patrik Nordwall <
>> patrik.nordw...@gmail.com> wrote:
>>
>>> You can stop it and start a new actor with the same persistenceId.
>>> /Patrik
>>> tors 13 okt. 2016 kl. 06:33 skrev Richard Rodseth <rrods...@gmail.com>:
>>>
>>>> I've been able to test recovery by using the in-memory journal and
>>>> sending a "bomb" message to the actor, which is handled by throwing an
>>>> exception :
>>>>
>>>> myActorRef ! DoSomething
>>>> myActorRef ! "bomb"
>>>> myActorRef ! GetState
>>>> expectMsg(MyActorState(...))
>>>>
>>>> Is there any way I can do this without having to add the special "bomb"
>>>> message to the the Actor.
>>>>
>>>> Thanks.
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>> urrent/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>> p/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to akka-user+unsubscr...@googlegroups.com.
>>>> To post to this group, send email to akka-user@googlegroups.com.
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
>
> Patrik Nordwall
> Akka Tech Lead
> Lightbend <http://www.lightbend.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
> --
>

Re: [akka-user] Testing Akka Persistence Recovery

2016-10-12 Thread Richard Rodseth
Thanks, but if I do this:

  system.stop(subscriber1)

  val subscriber2 = system.actorOf(props1, "name1")

I get an error that "name1" is not unique. The actor's persistence id is:

  override val persistenceId: String = "subscriber-" + self.path.name

On Wed, Oct 12, 2016 at 9:59 PM, Patrik Nordwall <patrik.nordw...@gmail.com>
wrote:

> You can stop it and start a new actor with the same persistenceId.
> /Patrik
> tors 13 okt. 2016 kl. 06:33 skrev Richard Rodseth <rrods...@gmail.com>:
>
>> I've been able to test recovery by using the in-memory journal and
>> sending a "bomb" message to the actor, which is handled by throwing an
>> exception :
>>
>> myActorRef ! DoSomething
>> myActorRef ! "bomb"
>> myActorRef ! GetState
>> expectMsg(MyActorState(...))
>>
>> Is there any way I can do this without having to add the special "bomb"
>> message to the the Actor.
>>
>> Thanks.
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>> current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Testing Akka Persistence Recovery

2016-10-12 Thread Richard Rodseth
I've been able to test recovery by using the in-memory journal and sending
a "bomb" message to the actor, which is handled by throwing an exception :

myActorRef ! DoSomething
myActorRef ! "bomb"
myActorRef ! GetState
expectMsg(MyActorState(...))

Is there any way I can do this without having to add the special "bomb"
message to the the Actor.

Thanks.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka HTTP complete Future obtained from ask, with custom marshalling

2016-10-07 Thread Richard Rodseth
You're right the types are different, and PimpedResult is not a case class so 
pattern matching doesn't work out. I think I must abandon this approach and 
mimic Heiko's.
Was hoping to ease the port from Spray but
this BinTray contribution is too complex.

Sent from my phone - will be brief

> On Oct 7, 2016, at 1:38 PM, Akka Team <akka.offic...@gmail.com> wrote:
> 
> To be fair you are doing something pretty complex with them, but I agree, the 
> magnet pattern definitely can be humbling.
> 
> Did the code that worked just fine earlier really deal with the exact same 
> type (tuple with statuscode and APIError)?
> 
> --
> Johan
> Akka Team
> 
>> On Fri, Oct 7, 2016 at 1:30 PM, Richard Rodseth <rrods...@gmail.com> wrote:
>> Heiko has a different approach here
>> https://github.com/hseeberger/reactive-flows/blob/master/src/main/scala/de/heikoseeberger/reactiveflows/Api.scala
>> 
>> onSuccess(flowFacade ? addFlow) {
>>   case bc: BadCommand => complete(BadRequest -> bc)
>>   case fe: FlowExists => complete(Conflict -> fe)
>>   case fa: FlowAdded  => completeCreated(fa.desc.name, fa)
>> }
>> 
>> If I recall correctly, the right arrow is creating a tuple. This seems like 
>> a nice approach, though I suppose it also relies on the untyped nature of Ask
>> In my case, if I try something like
>>   
>> onSuccess(requestHandler ? NotificationsRequestHandler.AskForStatus) { 
>> case apiError: APIError => complete(StatusCodes.BadRequest -> 
>> apiError)
>> 
>> ...
>> 
>>   }
>> 
>> my implicit conversions to ToResponseMarshallable aren't found, even though 
>> calling Marshall(reply).to[HttpResponse] a few lines earlier is just fine.
>> 
>> Implicit conversions sure are humbling.
>> 
>> 
>>> On Fri, Oct 7, 2016 at 11:59 AM, Viktor Klang <viktor.kl...@gmail.com> 
>>> wrote:
>>> WARNING: I may very well be incorrect here
>>> 
>>> Options:
>>> A: switch to the onComplete directive
>>> B: map to an Either[Result, PimpedResult]?
>>> 
>>>> On Fri, Oct 7, 2016 at 6:38 PM, Richard Rodseth <rrods...@gmail.com> wrote:
>>>> That's the trouble. Depending on whether the actor calls withStatusCode, 
>>>> it could be a Result or a PimpedResult. Both can be marshalled.
>>>> 
>>>> In the Spray application we completed the request inside a per-request 
>>>> actor. That is no longer possible, hence using Ask.
>>>> 
>>>>> On Fri, Oct 7, 2016 at 11:18 AM, Akka Team <akka.offic...@gmail.com> 
>>>>> wrote:
>>>>> Hi Richard,
>>>>> 
>>>>> You can combine ask with mapTo to cast it to the right type (or fail the 
>>>>> future if it does not have that type), see the docs here: 
>>>>> http://doc.akka.io/docs/akka/2.4/scala/futures.html#use-with-actors
>>>>> 
>>>>> --
>>>>> Johan
>>>>> Akka Team
>>>>> 
>>>>>> On Fri, Oct 7, 2016 at 10:55 AM, Richard Rodseth <rrods...@gmail.com> 
>>>>>> wrote:
>>>>>> Continuing my struggles to port something we did with Spray, using
>>>>>> 
>>>>>> // See https://bitbucket.org/binarycamp/spray-contrib/src
>>>>>> 
>>>>>> I have resolved my implicit conversion errors to the point where I can 
>>>>>> execute the following (Result[T] is an alias for Either[APIError, T]):
>>>>>> 
>>>>>>   val successResult: Result[NotificationsStatusDTO] = 
>>>>>> Right(NotificationsStatusDTO("foo"))
>>>>>> 
>>>>>>   val successResponseFuture: Future[HttpResponse] = 
>>>>>> Marshal(successResult).to[HttpResponse]
>>>>>> 
>>>>>>   val successResponse = Await.result(successResponseFuture, 1.second) // 
>>>>>> don't block in non-test code!
>>>>>> 
>>>>>>   println(s"Success response $successResponse")
>>>>>> 
>>>>>>   val successResult2: Result[NotificationsStatusDTO] = 
>>>>>> Right(NotificationsStatusDTO("foo"))
>>>>>> 
>>>>>>   val resultWithStatusCode: PimpedResult[(StatusCode, 
>>>>>> NotificationsStatusDTO)] = 
>>>>>> successResult2.withStatusCode(StatusCodes.

Re: [akka-user] Akka HTTP complete Future obtained from ask, with custom marshalling

2016-10-07 Thread Richard Rodseth
Heiko has a different approach here
https://github.com/hseeberger/reactive-flows/blob/master/src/main/scala/de/heikoseeberger/reactiveflows/Api.scala

onSuccess(flowFacade ? addFlow) {
case bc: BadCommand => complete(BadRequest -> bc)
case fe: FlowExists => complete(Conflict -> fe)
case fa: FlowAdded => completeCreated(fa.desc.name, fa)
}

If I recall correctly, the right arrow is creating a tuple. This seems like
a nice approach, though I suppose it also relies on the untyped nature of
Ask
In my case, if I try something like

onSuccess(requestHandler ? NotificationsRequestHandler.AskForStatus) {

case apiError: APIError => complete(StatusCodes.BadRequest ->
apiError)

...

  }

my implicit conversions to ToResponseMarshallable aren't found, even though
calling Marshall(reply).to[HttpResponse] a few lines earlier is just fine.

Implicit conversions sure are humbling.

On Fri, Oct 7, 2016 at 11:59 AM, Viktor Klang <viktor.kl...@gmail.com>
wrote:

> WARNING: I may very well be incorrect here
>
> Options:
> A: switch to the onComplete directive
> B: map to an Either[Result, PimpedResult]?
>
> On Fri, Oct 7, 2016 at 6:38 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> That's the trouble. Depending on whether the actor calls withStatusCode,
>> it could be a Result or a PimpedResult. Both can be marshalled.
>>
>> In the Spray application we completed the request inside a per-request
>> actor. That is no longer possible, hence using Ask.
>>
>> On Fri, Oct 7, 2016 at 11:18 AM, Akka Team <akka.offic...@gmail.com>
>> wrote:
>>
>>> Hi Richard,
>>>
>>> You can combine ask with mapTo to cast it to the right type (or fail
>>> the future if it does not have that type), see the docs here:
>>> http://doc.akka.io/docs/akka/2.4/scala/futures.html#use-with-actors
>>>
>>> --
>>> Johan
>>> Akka Team
>>>
>>> On Fri, Oct 7, 2016 at 10:55 AM, Richard Rodseth <rrods...@gmail.com>
>>> wrote:
>>>
>>>> Continuing my struggles to port something we did with Spray, using
>>>>
>>>> // See https://bitbucket.org/binarycamp/spray-contrib/src
>>>>
>>>> I have resolved my implicit conversion errors to the point where I can
>>>> execute the following (Result[T] is an alias for Either[APIError, T]):
>>>>
>>>>   val successResult: Result[NotificationsStatusDTO] =
>>>> Right(NotificationsStatusDTO("foo"))
>>>>
>>>>   val successResponseFuture: Future[HttpResponse] = Marshal(
>>>> successResult).to[HttpResponse]
>>>>
>>>>   val successResponse = Await.result(successResponseFuture, 1.second) //
>>>> don't block in non-test code!
>>>>
>>>>   println(s"Success response $successResponse")
>>>>
>>>>   val successResult2: Result[NotificationsStatusDTO] =
>>>> Right(NotificationsStatusDTO("foo"))
>>>>
>>>>   val resultWithStatusCode: PimpedResult[(StatusCode,
>>>> NotificationsStatusDTO)] = successResult2.withStatusCode(
>>>> StatusCodes.Accepted)
>>>>
>>>>   val successResponseWithStatusCodeFuture: Future[HttpResponse] =
>>>> Marshal(resultWithStatusCode).to[HttpResponse]
>>>>
>>>>   val successResponseWithStatusCode = Await.result(successResponseWi
>>>> thStatusCodeFuture, 1.second) // don't block in non-test code!
>>>>
>>>>   println(s"Success with status code $successResponseWithStatusCode")
>>>>
>>>> But I have not been able to figure out how to use the onSuccess
>>>> directive (or another if more appropriate) if the response (either a Result
>>>> or a PimpedResult as above) is coming from an Ask.
>>>>
>>>> This obviously doesn't work:
>>>>
>>>>   onSuccess(requestHandler ? 
>>>> NotificationsRequestHandler.AskForStatus)
>>>> {
>>>>
>>>> case _ => complete(x)
>>>>
>>>>   }
>>>>
>>>> Any pointers?
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>> urrent/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>> p/akka-user
>>>> ---
>>

Re: [akka-user] Re: Organizing Route Directives

2016-10-07 Thread Richard Rodseth
Isn't it true that your routes need to be in a trait t be able to make use
of the Routing Testkit?

On Fri, Oct 7, 2016 at 12:08 PM,  wrote:

> Thank you for the response Johan but I'm not sure that really answers my
> question but perhaps I can ask some other questions that may help to get
> what I need.
>
> So far I have been operating on the assumption that I can have only one
> route function at the top most level to define all of the routing
> information for my server. This is due to the Http object taking in a
> single route object during a bindAndHandle like so:
>
> ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
> final Flow routeFlow = new 
> RestApiDirectives().createRoute().flow(actorSystem, materializer);
>
> CompletionStage httpBinding = http.bindAndHandle(routeFlow, 
> ConnectHttp.toHost(httpHost, httpPort), materializer);
>
>
> This leads me to having a class that extends some directives class with a
> single createRoute() function that contains all of the routing logic for
> the entire server. Example:
>
> public class RestApiDirectives extends AllDirectives {
> public Route createRoute() {
> return ...;
> }
> }
>
>
> So if I want to break out my route implementation into further classes
> then how do I bring those new route functions into the main createRoute? So
> far I've done it by a crazy chain of inheritance similar to how Akka does
> it. However, just now I did try out simply calling *new *on the owning
> class and making the function call instead. Example...
>
> public Route create() {
> return route(
> new SubsystemA().createRoute(),
> new SubsystemB().createRoute(),
> new SubsystemC().createRoute(),
> new SubsystemD().createRoute()
> );
> }
>
>
> This actually seems to work but feels as equally terrible as the crazy
> chain of inheritance. Why? This is because any time I have an instance of a
> class I assume there is some state information being maintained that
> matters. If I need to instantiate an initial object to create a route it
> stands to reason that any further work done inside that route needs to pass
> along it's state to the next object instance to function correctly. The
> fact that I can arbitrarily instantiate new objects with route functions
> and call them means that there is no state being maintained across
> instances that are unique to the original routing object. So then why are
> the directives not simply static functions? It would be a whole lot simpler
> to understand, organizationally, that I can create as many classes with
> static routing functions as I want and simply chain them together. The
> current approach just causes lots of confusion and considerable wasted
> time, especially when there is no documentation or example code anywhere
> that clearly states that you can link routing functions in this way.
>
> So I guess I've found my own answer here but please consider fixing the
> documentation to make this information very clear and add some real world
> examples in how a complicated routing structure can be achieved that goes
> beyond a single class with one createRoute() function.
>
> Thanks for your time!
>
> On Friday, October 7, 2016 at 9:38:45 AM UTC-7, Akka Team wrote:
>>
>> In general, don't try to mimic the way it is done in Akka HTTP itself
>> unless you actually have the same requirements.  (To support both mixin and
>> static imports, dealing with both a Scala and Java API etc)
>>
>> Modularising the routes is not different from modularising other kinds of
>> codebases, you have options ranging from classes and objects to functions,
>> each having their own pros and cons. If they have dependencies you have to
>> provide that in some way, which you can either do directly "hardcoded" or
>> by some form of injection (note that I'm using inject in the wider sense of
>> "someone else providing a thing I need" rather than "use a DI-framework"
>> here) depending on your needs.
>>
>> Think about how you would usually separate concerns (usually composition
>> over inheritance is a good idea here) and provide their respective
>> dependencies to them. Also, if you already do separate concerns in your
>> codebase, think about using the same way for routes rather than introducing
>> an entirely different way to do that.
>>
>> --
>> Johan
>> Akka Team
>>
>> On Fri, Oct 7, 2016 at 11:09 AM,  wrote:
>>
>>> Yes I understand that much and I have broken up my implementation into
>>> various Route producing functions as you've suggested. The problem I have
>>> is that it appears for all of this to work I must have all of these
>>> functions described in a singular class namespace. This appears to be due
>>> to some inherent state being maintained that can't be easily passed from
>>> one class to another.
>>>
>>> Either you define all of the Route functions in a single class 

Re: [akka-user] Akka HTTP complete Future obtained from ask, with custom marshalling

2016-10-07 Thread Richard Rodseth
That's the trouble. Depending on whether the actor calls withStatusCode, it
could be a Result or a PimpedResult. Both can be marshalled.

In the Spray application we completed the request inside a per-request
actor. That is no longer possible, hence using Ask.

On Fri, Oct 7, 2016 at 11:18 AM, Akka Team <akka.offic...@gmail.com> wrote:

> Hi Richard,
>
> You can combine ask with mapTo to cast it to the right type (or fail the
> future if it does not have that type), see the docs here:
> http://doc.akka.io/docs/akka/2.4/scala/futures.html#use-with-actors
>
> --
> Johan
> Akka Team
>
> On Fri, Oct 7, 2016 at 10:55 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Continuing my struggles to port something we did with Spray, using
>>
>> // See https://bitbucket.org/binarycamp/spray-contrib/src
>>
>> I have resolved my implicit conversion errors to the point where I can
>> execute the following (Result[T] is an alias for Either[APIError, T]):
>>
>>   val successResult: Result[NotificationsStatusDTO] =
>> Right(NotificationsStatusDTO("foo"))
>>
>>   val successResponseFuture: Future[HttpResponse] = Marshal(successResult
>> ).to[HttpResponse]
>>
>>   val successResponse = Await.result(successResponseFuture, 1.second) //
>> don't block in non-test code!
>>
>>   println(s"Success response $successResponse")
>>
>>   val successResult2: Result[NotificationsStatusDTO] =
>> Right(NotificationsStatusDTO("foo"))
>>
>>   val resultWithStatusCode: PimpedResult[(StatusCode,
>> NotificationsStatusDTO)] = successResult2.withStatusCode(
>> StatusCodes.Accepted)
>>
>>   val successResponseWithStatusCodeFuture: Future[HttpResponse] =
>> Marshal(resultWithStatusCode).to[HttpResponse]
>>
>>   val successResponseWithStatusCode = Await.result(successResponseWi
>> thStatusCodeFuture, 1.second) // don't block in non-test code!
>>
>>   println(s"Success with status code $successResponseWithStatusCode")
>>
>> But I have not been able to figure out how to use the onSuccess directive
>> (or another if more appropriate) if the response (either a Result or a
>> PimpedResult as above) is coming from an Ask.
>>
>> This obviously doesn't work:
>>
>>   onSuccess(requestHandler ? 
>> NotificationsRequestHandler.AskForStatus)
>> {
>>
>> case _ => complete(x)
>>
>>   }
>>
>> Any pointers?
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka HTTP complete Future obtained from ask, with custom marshalling

2016-10-07 Thread Richard Rodseth
Continuing my struggles to port something we did with Spray, using

// See https://bitbucket.org/binarycamp/spray-contrib/src

I have resolved my implicit conversion errors to the point where I can
execute the following (Result[T] is an alias for Either[APIError, T]):

  val successResult: Result[NotificationsStatusDTO] =
Right(NotificationsStatusDTO("foo"))

  val successResponseFuture: Future[HttpResponse] = Marshal(successResult
).to[HttpResponse]

  val successResponse = Await.result(successResponseFuture, 1.second) //
don't block in non-test code!

  println(s"Success response $successResponse")

  val successResult2: Result[NotificationsStatusDTO] =
Right(NotificationsStatusDTO("foo"))

  val resultWithStatusCode: PimpedResult[(StatusCode,
NotificationsStatusDTO)] = successResult2
.withStatusCode(StatusCodes.Accepted)

  val successResponseWithStatusCodeFuture: Future[HttpResponse] = Marshal(
resultWithStatusCode).to[HttpResponse]

  val successResponseWithStatusCode = Await.result(
successResponseWithStatusCodeFuture, 1.second) // don't block in non-test
code!

  println(s"Success with status code $successResponseWithStatusCode")

But I have not been able to figure out how to use the onSuccess directive
(or another if more appropriate) if the response (either a Result or a
PimpedResult as above) is coming from an Ask.

This obviously doesn't work:

  onSuccess(requestHandler ?
NotificationsRequestHandler.AskForStatus) {

case _ => complete(x)

  }

Any pointers?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] DDD/CQRS - Modifying write model solely to support read model projection

2016-09-27 Thread Richard Rodseth
I asked this over on the DDD/CQRS list, but didn't get a reply, so I
thought I'd try here.

Imagine a system to notify users about alarms. I'm using Akka Persistence
which supports streaming projections from the event store to the read side.

I'm considering three aggregates, (alarm)Definition, Site and User.
Definitions and Users each have a site path which associates them with
sites. When a user is opted in we can connect it to accessible sites. When
a definition is created or updated, we can connect it to applicable sites.

The definition aggregate may in fact not be needed for notifications, since
when an alarm occurs, the occurrence already knows about the site, so
navigation could start with that aggregate and dispatch to the opted-in
users.

However, the read model may need to support "get users(subscribers) by
definition". I could extend the write path so that when a user is opted in
it is associated not only with accessible sites, but also with definitions
applicable to those sites. Or perhaps the site aggregate coud make the
association internally between users and definitions. Either way, that
logic would most likely be there solely to support the read model
projection. Is that unusual or a red flag of any sort?

Thoughts?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Debugging marshalling implicits

2016-09-20 Thread Richard Rodseth
Thanks. It appears to be the absence of an Either marshaller ( Result[T] is
an alias for Either[APIError,T] )
Is that something that was available in Spray, but not in Akka HTTP ?
I see this got merged:
https://github.com/akka/akka/pull/20274
but it's an Unmarshaller.

On Tue, Sep 20, 2016 at 3:18 AM, 'Johannes Rudolph' via Akka User List <
akka-user@googlegroups.com> wrote:

> Hi,
>
> On Tuesday, September 13, 2016 at 4:26:55 PM UTC+2, rrodseth wrote:
> > Is this the right way to debug the missing conversion?
> > val prm = implicitly[Future[PimpedResult[(StatusCode,
> Result[StatusDTO])]] => ToResponseMarshallable]
>
>
> Try
>
> implicitly[ToResponseMarshaller[Future[PimpedResult[(StatusCode,
> Result[StatusDTO])]]]
>
>
> then
>
>
> implicitly[ToResponseMarshaller[PimpedResult[(StatusCode,
> Result[StatusDTO])]]
>
>
> then
>
> implicitly[ToEntityMarshaller[Result[StatusDTO]]] // required by
> APICustomMarshallers.statusMarshaller
>
> or just
>
> statusMarshaller[Result[StatusDTO]]]
>
>
> and see which one still succeeds. Then you need to work back from the
> simple ones to the more complex types, trying to get those to work.
>
> HTH
> Johannes
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Debugging marshalling implicits

2016-09-13 Thread Richard Rodseth
Bump.

Is this the right way to debug the missing conversion?
val prm = implicitly[Future[PimpedResult[(StatusCode, Result[StatusDTO])]]
=> ToResponseMarshallable]


On Wed, Sep 7, 2016 at 8:43 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> Can anyone help me with this please?
> I have the following in my route:
>
> val result = (requestHandler ? RequestHandler.AskForStatus).
> mapTo[PimpedResult[(StatusCode, Result[StatusDTO])]]
>
> onSuccess(result) {
>   case _ => complete(result)
> }
>
> The mapTo is really just to assist my debugging. The result could be a
> Result[StatusDTO] or a PimpedResult.
> Anyway, I get the following error on the complete call:
>
> type mismatch; found : 
> scala.concurrent.Future[PimpedResult[(akka.http.scaladsl.model.StatusCode,
> Result[StatusDTO])]] (which expands to) scala.concurrent.Future[
> PimpedResult[(akka.http.scaladsl.model.StatusCode,
> scala.util.Either[APIError,StatusDTO])]] required: akka.http.scaladsl.
> marshalling.ToResponseMarshallable
>
> I added the following after the mapTo:
>
> val prm = implicitly[Future[PimpedResult[(StatusCode,
> Result[StatusDTO])]] => ToResponseMarshallable]
>
> and that line gets this error:
>
> Multiple markers at this line:
>
> not enough arguments for method implicitly: (implicit e:
> scala.concurrent.Future[PimpedResult[(akka.http.scaladsl.model.StatusCode,
> Result[StatusDTO])]] ⇒ akka.http.scaladsl.marshalling.
> ToResponseMarshallable)scala.concurrent.Future[PimpedResult[(akka.http.scaladsl.model.StatusCode,
> Result[StatusDTO])]] ⇒ akka.http.scaladsl.marshalling.ToResponseMarshallable.
> Unspecified value parameter e.
> No implicit view available from scala.concurrent.Future[
> PimpedResult[(akka.http.scaladsl.model.StatusCode, Result[StatusDTO])]] ⇒
> akka.http.scaladsl.marshalling.ToResponseMarshallable.
>
> Here is the custom marshaller. I know it's in scope because the
> errorMarshaller works in the case where the response is Result[StatusDTO]
> rather than PimpedResult
>
> // See https://bitbucket.org/binarycamp/spray-contrib/src
> trait APICustomMarshallers {
>
>   implicit def errorMarshaller[T](
> implicit translator: ErrorTranslator[T],
> statusCodeMapper: ErrorToStatusCodeMapper,
> m: ToEntityMarshaller[T]): ToResponseMarshaller[APIError] = {
>
> val base = PredefinedToResponseMarshallers.
> fromStatusCodeAndHeadersAndValue
> val result = base.compose {
>   error: APIError => 
> (statusCodeMapper.orElse(ErrorToStatusCodeMapper.Default)(error),
> DefaultHttpHeaders.NoCacheHeaders, translator(error))
> }
> result
>   }
>
>   implicit def statusMarshaller[T](implicit m: ToEntityMarshaller[T]):
> ToResponseMarshaller[PimpedResult[(StatusCode, T)]] = {
> val base = PredefinedToResponseMarshallers.
> fromStatusCodeAndHeadersAndValue // Needs the entity marshaller
> val result = base.compose {
>   pimpedResult: PimpedResult[(StatusCode, T)] =>
> pimpedResult.result match {
>   case Right(response) => (response._1, 
> DefaultHttpHeaders.NoCacheHeaders,
> response._2)
>   case _   => throw new RuntimeException("StatusMarshaller
> got invalid PimpedResult")
> }
> }
> result
>   }
> }
>
> Thanks for any help.
>
>
>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Testing routes and marshalling

2016-09-12 Thread Richard Rodseth
Thank you. I got it working with a custom test actor.
If you get a chance to look at the follow-up topic "Debugging marshalling
implicits", it would be much appreciated
A shorter question would be:

Is this the right way to debug when a type cannot be marshalled?
val prm = implicitly[Future[PimpedResult[(StatusCode, Result[StatusDTO])]]
=> ToResponseMarshallable]


On Fri, Sep 9, 2016 at 8:26 AM, Akka Team <akka.offic...@gmail.com> wrote:

> Hi Richard,
>
> The HTTP Testkit expects to have gotten the reply already when check
> executes, so that is why it does not work.
>
> You can use the autopilot feature of the TestProbe or a custom test actor
> responding the way you want in the test, it could also keep incoming
> requests for later assertions.
>
> --
> Johan
> Akka Team
>
> On Wed, Sep 7, 2016 at 1:47 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Oy. I'm getting tied up in knots. I guess the requestHandlerProbe.reply
>> won't work without a proper sender. I usually put my route definition in an
>> actor, but moved it to a trait so that it could be used by the test kit.
>> But I did it like this
>>
>> def route(requestHandler: ActorRef) = ...
>>
>> Can anyone point me at a sample which uses the ask pattern in routes, and
>> also tests the routes with the ScalatestRouteTest?
>>
>> On Tue, Sep 6, 2016 at 12:56 PM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>>> So the example here is not very realistic because the route does not
>>> depend on any actors:
>>>
>>> http://doc.akka.io/docs/akka/2.4.9/scala/http/routing-dsl/te
>>> stkit.html#Usage
>>>
>>> I have a status api that returns Either[APIError, StatusDTO].
>>>
>>> val statusRoute = path("status") {
>>>
>>>   get {
>>>
>>> handleErrors {
>>>
>>>   onSuccess(requestHandler ? RequestHandler.AskForStatus) {
>>>
>>>   complete(result)
>>>
>>>   }
>>>
>>> }
>>>
>>>   }
>>>
>>> }
>>>  Pondering whether to make a mock actor for requestHandler, or use a
>>> probe.
>>>
>>> This works:
>>>
>>>   Get("/status") ~> myRoute ~> check {
>>>
>>> requestHandlerProbe.expectMsgClass(100 millis,
>>> RequestHandler.AskForStatus.getClass)
>>>
>>>   }
>>>
>>> This times out
>>>
>>>   Get("/status") ~> myRoute ~> check {
>>>
>>> requestHandlerProbe.expectMsgClass(100 millis,
>>> RequestHandler.AskForStatus.getClass)
>>>
>>>  val result: Either[APIError, StatusDTO] = Right(StatusDTO("message"
>>> ))
>>>
>>> requestHandlerProbe.reply(result)
>>>
>>> println(this.responseEntity)
>>>
>>>   }
>>>
>>> Any errors in my use of probe.reply ? Would it be better to test
>>> response marshalling on its own and limit routing tests to expectMsg?
>>>
>>>
>>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
> Akka Team
> Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM
> Twitter: @akkateam
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Debugging marshalling implicits

2016-09-07 Thread Richard Rodseth
Can anyone help me with this please?
I have the following in my route:

val result = (requestHandler ?
RequestHandler.AskForStatus).mapTo[PimpedResult[(StatusCode,
Result[StatusDTO])]]

onSuccess(result) {
  case _ => complete(result)
}

The mapTo is really just to assist my debugging. The result could be a
Result[StatusDTO] or a PimpedResult.
Anyway, I get the following error on the complete call:

type mismatch; found :
scala.concurrent.Future[PimpedResult[(akka.http.scaladsl.model.StatusCode,
Result[StatusDTO])]] (which expands to)
scala.concurrent.Future[PimpedResult[(akka.http.scaladsl.model.StatusCode,
scala.util.Either[APIError,StatusDTO])]] required:
akka.http.scaladsl.marshalling.ToResponseMarshallable

I added the following after the mapTo:

val prm = implicitly[Future[PimpedResult[(StatusCode, Result[StatusDTO])]]
=> ToResponseMarshallable]

and that line gets this error:

Multiple markers at this line:

not enough arguments for method implicitly: (implicit e:
scala.concurrent.Future[PimpedResult[(akka.http.scaladsl.model.StatusCode,
Result[StatusDTO])]] ⇒
akka.http.scaladsl.marshalling.ToResponseMarshallable)scala.concurrent.Future[PimpedResult[(akka.http.scaladsl.model.StatusCode,
Result[StatusDTO])]] ⇒
akka.http.scaladsl.marshalling.ToResponseMarshallable. Unspecified value
parameter e.
No implicit view available from
scala.concurrent.Future[PimpedResult[(akka.http.scaladsl.model.StatusCode,
Result[StatusDTO])]] ⇒
akka.http.scaladsl.marshalling.ToResponseMarshallable.

Here is the custom marshaller. I know it's in scope because the
errorMarshaller works in the case where the response is Result[StatusDTO]
rather than PimpedResult

// See https://bitbucket.org/binarycamp/spray-contrib/src
trait APICustomMarshallers {

  implicit def errorMarshaller[T](
implicit translator: ErrorTranslator[T],
statusCodeMapper: ErrorToStatusCodeMapper,
m: ToEntityMarshaller[T]): ToResponseMarshaller[APIError] = {

val base =
PredefinedToResponseMarshallers.fromStatusCodeAndHeadersAndValue
val result = base.compose {
  error: APIError =>
(statusCodeMapper.orElse(ErrorToStatusCodeMapper.Default)(error),
DefaultHttpHeaders.NoCacheHeaders, translator(error))
}
result
  }

  implicit def statusMarshaller[T](implicit m: ToEntityMarshaller[T]):
ToResponseMarshaller[PimpedResult[(StatusCode, T)]] = {
val base =
PredefinedToResponseMarshallers.fromStatusCodeAndHeadersAndValue // Needs
the entity marshaller
val result = base.compose {
  pimpedResult: PimpedResult[(StatusCode, T)] =>
pimpedResult.result match {
  case Right(response) => (response._1,
DefaultHttpHeaders.NoCacheHeaders, response._2)
  case _   => throw new
RuntimeException("StatusMarshaller got invalid PimpedResult")
}
}
result
  }
}

Thanks for any help.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Testing routes and marshalling

2016-09-06 Thread Richard Rodseth
Oy. I'm getting tied up in knots. I guess the requestHandlerProbe.reply
won't work without a proper sender. I usually put my route definition in an
actor, but moved it to a trait so that it could be used by the test kit.
But I did it like this

def route(requestHandler: ActorRef) = ...

Can anyone point me at a sample which uses the ask pattern in routes, and
also tests the routes with the ScalatestRouteTest?

On Tue, Sep 6, 2016 at 12:56 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> So the example here is not very realistic because the route does not
> depend on any actors:
>
> http://doc.akka.io/docs/akka/2.4.9/scala/http/routing-dsl/
> testkit.html#Usage
>
> I have a status api that returns Either[APIError, StatusDTO].
>
> val statusRoute = path("status") {
>
>   get {
>
> handleErrors {
>
>   onSuccess(requestHandler ? RequestHandler.AskForStatus) {
>
>   complete(result)
>
>   }
>
> }
>
>   }
>
> }
>  Pondering whether to make a mock actor for requestHandler, or use a probe.
>
> This works:
>
>   Get("/status") ~> myRoute ~> check {
>
> requestHandlerProbe.expectMsgClass(100 millis,
> RequestHandler.AskForStatus.getClass)
>
>   }
>
> This times out
>
>   Get("/status") ~> myRoute ~> check {
>
> requestHandlerProbe.expectMsgClass(100 millis,
> RequestHandler.AskForStatus.getClass)
>
>  val result: Either[APIError, StatusDTO] = Right(StatusDTO("message"))
>
> requestHandlerProbe.reply(result)
>
> println(this.responseEntity)
>
>   }
>
> Any errors in my use of probe.reply ? Would it be better to test response
> marshalling on its own and limit routing tests to expectMsg?
>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Spray to Akka HTTP

2016-09-02 Thread Richard Rodseth
I've yet to test this out, but it compiles. Hope I'm on the right track.

trait APIStatusMarshallers {


  implicit def errorMarshaller[T](implicit translator: ErrorTranslator[T],

  statusCodeMapper: ErrorToStatusCodeMapper,

  m: ToEntityMarshaller[T]):
ToResponseMarshaller[APIError] = {


val base = PredefinedToResponseMarshallers
.fromStatusCodeAndHeadersAndValue

val result = base.compose {

  error: APIError => (statusCodeMapper.orElse(ErrorToStatusCodeMapper
.Default)(error), DefaultHttpHeaders.NoCacheHeaders, translator(error))

}

result

  }


  implicit def statusMarshaller[T](implicit m: ToEntityMarshaller[T]):
ToResponseMarshaller[PimpedResult[(StatusCode, T)]] = {

val base = PredefinedToResponseMarshallers
.fromStatusCodeAndHeadersAndValue

val result = base.compose {

  pimpedResult: PimpedResult[(StatusCode, T)] =>

pimpedResult.result match {

  case Right(response) => (response._1,
DefaultHttpHeaders.NoCacheHeaders,
response._2)

  case _   => throw new RuntimeException("StatusMarshaller
got invalid PimpedResult")

}

}

result

  }

}

On Fri, Sep 2, 2016 at 1:01 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> I'm guessing the errorMarshaller and statusMarshaller shown (which use
> compose) need to be recast according to
>
> http://doc.akka.io/docs/akka/2.4.9/scala/http/common/
> marshalling.html#Deriving_Marshallers
>
> But that paragraph of documentation could really use an example.
>
>
> On Thu, Sep 1, 2016 at 3:19 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> I'm struggling to convert the following response marshaller.
>> ToResponseMarshaller is a trait and object in Spray, and Marshaller takes
>> one type parameter.
>>
>> I've read the docs, and am stumped. Can anyone provide further guidance?
>>
>> // See https://bitbucket.org/binarycamp/spray-contrib/src
>>
>>
>> trait APIStatusMarshallers {
>>
>>
>> implicit def errorMarshaller[T](implicit translator: ErrorTranslator[
>> T],
>>
>>   statusCodeMapper:
>> ErrorToStatusCodeMapper,
>>
>>   m: Marshaller[T]): ToResponseMarshaller
>> [APIError] =
>>
>> ToResponseMarshaller.fromStatusCodeAndHeadersAndT.compose {
>>
>>   error => (statusCodeMapper.orElse(ErrorToStatusCodeMapper.Default)(
>> error), DefaultHttpHeaders.NoCacheHeaders,translator(error))
>>
>> }
>>
>>
>>
>> implicit def statusMarshaller[T](implicit m: Marshaller[T]):
>> ToResponseMarshaller[PimpedResult[(StatusCode, T)]] = {
>>
>> ToResponseMarshaller.fromStatusCodeAndHeadersAndT.compose {
>>
>>   pimpedResult => pimpedResult.result match {
>>
>> case Right(response) => (response._1, DefaultHttpHeaders.
>> NoCacheHeaders, response._2)
>>
>> case _ => throw new RuntimeException("StatusMarshaller got
>> invalid PimpedResult")
>>
>>   }
>>
>> }
>>
>>   }
>>
>> }
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Spray to Akka HTTP

2016-09-02 Thread Richard Rodseth
I'm guessing the errorMarshaller and statusMarshaller shown (which use
compose) need to be recast according to

http://doc.akka.io/docs/akka/2.4.9/scala/http/common/marshalling.html#Deriving_Marshallers

But that paragraph of documentation could really use an example.


On Thu, Sep 1, 2016 at 3:19 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> I'm struggling to convert the following response marshaller.
> ToResponseMarshaller is a trait and object in Spray, and Marshaller takes
> one type parameter.
>
> I've read the docs, and am stumped. Can anyone provide further guidance?
>
> // See https://bitbucket.org/binarycamp/spray-contrib/src
>
>
> trait APIStatusMarshallers {
>
>
> implicit def errorMarshaller[T](implicit translator: ErrorTranslator[T
> ],
>
>   statusCodeMapper:
> ErrorToStatusCodeMapper,
>
>   m: Marshaller[T]): ToResponseMarshaller[
> APIError] =
>
> ToResponseMarshaller.fromStatusCodeAndHeadersAndT.compose {
>
>   error => (statusCodeMapper.orElse(ErrorToStatusCodeMapper.Default)(
> error), DefaultHttpHeaders.NoCacheHeaders,translator(error))
>
> }
>
>
>
> implicit def statusMarshaller[T](implicit m: Marshaller[T]):
> ToResponseMarshaller[PimpedResult[(StatusCode, T)]] = {
>
> ToResponseMarshaller.fromStatusCodeAndHeadersAndT.compose {
>
>   pimpedResult => pimpedResult.result match {
>
> case Right(response) => (response._1, DefaultHttpHeaders.
> NoCacheHeaders, response._2)
>
> case _ => throw new RuntimeException("StatusMarshaller got
> invalid PimpedResult")
>
>   }
>
> }
>
>   }
>
> }
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Spray to Akka HTTP

2016-09-01 Thread Richard Rodseth
I'm struggling to convert the following response marshaller.
ToResponseMarshaller is a trait and object in Spray, and Marshaller takes
one type parameter.

I've read the docs, and am stumped. Can anyone provide further guidance?

// See https://bitbucket.org/binarycamp/spray-contrib/src


trait APIStatusMarshallers {


implicit def errorMarshaller[T](implicit translator: ErrorTranslator[T],

  statusCodeMapper: ErrorToStatusCodeMapper,

  m: Marshaller[T]): ToResponseMarshaller[
APIError] =

ToResponseMarshaller.fromStatusCodeAndHeadersAndT.compose {

  error => (statusCodeMapper.orElse(ErrorToStatusCodeMapper.Default)(
error), DefaultHttpHeaders.NoCacheHeaders,translator(error))

}



implicit def statusMarshaller[T](implicit m: Marshaller[T]):
ToResponseMarshaller[PimpedResult[(StatusCode, T)]] = {

ToResponseMarshaller.fromStatusCodeAndHeadersAndT.compose {

  pimpedResult => pimpedResult.result match {

case Right(response) => (response._1, DefaultHttpHeaders.
NoCacheHeaders, response._2)

case _ => throw new RuntimeException("StatusMarshaller got invalid
PimpedResult")

  }

}

  }

}

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Configurable parallelism/elastic groupBy

2016-07-22 Thread Richard Rodseth
Thanks Endre. Ticket created

https://github.com/akka/akka/issues/21020

Perhaps I mislead by using the word "elastic" in my subject. I just need
"configurable". That said, why would I need consistent hashing? It seems
that if I'm just preserving order within a channel by ensuring that all
stream elements from a channel are sent to the same worker, channelId
modulo n would suffice, no?

Also, shouldn't there just be a partition() operator?

On Fri, Jul 22, 2016 at 3:10 AM, Akka Team <akka.offic...@gmail.com> wrote:

>
>
> On Fri, Jul 22, 2016 at 2:14 AM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Or perhaps you are talking about something like
>>
>>
>> http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers
>>
>
> Yes
>
>
>>
>> but using the Partition stage instead of Balance
>>
>
>>
>> http://doc.akka.io/api/akka/2.4.7/?_ga=1.10841340.933910455.1454640777#akka.stream.scaladsl.Partition
>>
>
> Exactly!
>
>
>>
>> which does not appear to be documented here
>>
>>
>> http://doc.akka.io/docs/akka/2.4.8/scala/stream/stages-overview.html#Fan-out_stages
>>
>
>
> Meh, that is true. Please file a ticket (or even better, send a PR towards
> us :) ).
>
> -Endre
>
>
>>
>>
>> On Thu, Jul 21, 2016 at 10:00 AM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>>> Thanks. That gives me some terms to Google, but any further pointers
>>> welcome. I see references to FlexiRoute being replaced by GraphStage.
>>> Presumably that's what I need to study next?
>>>
>>> On Thu, Jul 21, 2016 at 12:59 AM, Viktor Klang <viktor.kl...@gmail.com>
>>> wrote:
>>>
>>>> A routing stage with consistent hashing and then a N-Way merge to
>>>> insert into db?
>>>>
>>>> --
>>>> Cheers,
>>>> √
>>>>
>>>> On Jul 20, 2016 9:09 PM, "Richard Rodseth" <rrods...@gmail.com> wrote:
>>>>
>>>>> I'm sure I've asked this before in numerous ways, but it's still an
>>>>> issue for me.
>>>>>
>>>>> I have an ETL stream that reads per-channel data and writes it to a
>>>>> destination without backpressure. Within a channel, order of writes must 
>>>>> be
>>>>> preserved. So I want parallelism between channels, but not within.
>>>>>
>>>>> If I groupBy by channel, I can't control the degree of parallelism
>>>>>  and the destination is overwhelmed.
>>>>> Should I make an ActorSubscriber for the write, that backpressures
>>>>> (based on an integer value) and use alsoTo on the SubFlow?
>>>>>
>>>>> Any way to achieve this with standard components?
>>>>>
>>>>> --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ:
>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives:
>>>>> https://groups.google.com/group/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to akka-user+unsubscr...@googlegroups.com.
>>>>> To post to this group, send email to akka-user@googlegroups.com.
>>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ:
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives:
>>>> https://groups.google.com/group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to akka-user+unsubscr...@googlegroups.com.
>>>> To post to this group, send email to akka-user@googlegroups.com.
>>&

Re: [akka-user] Configurable parallelism/elastic groupBy

2016-07-21 Thread Richard Rodseth
Or perhaps you are talking about something like

http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers

but using the Partition stage instead of Balance

http://doc.akka.io/api/akka/2.4.7/?_ga=1.10841340.933910455.1454640777#akka.stream.scaladsl.Partition

which does not appear to be documented here

http://doc.akka.io/docs/akka/2.4.8/scala/stream/stages-overview.html#Fan-out_stages

On Thu, Jul 21, 2016 at 10:00 AM, Richard Rodseth <rrods...@gmail.com>
wrote:

> Thanks. That gives me some terms to Google, but any further pointers
> welcome. I see references to FlexiRoute being replaced by GraphStage.
> Presumably that's what I need to study next?
>
> On Thu, Jul 21, 2016 at 12:59 AM, Viktor Klang <viktor.kl...@gmail.com>
> wrote:
>
>> A routing stage with consistent hashing and then a N-Way merge to insert
>> into db?
>>
>> --
>> Cheers,
>> √
>>
>> On Jul 20, 2016 9:09 PM, "Richard Rodseth" <rrods...@gmail.com> wrote:
>>
>>> I'm sure I've asked this before in numerous ways, but it's still an
>>> issue for me.
>>>
>>> I have an ETL stream that reads per-channel data and writes it to a
>>> destination without backpressure. Within a channel, order of writes must be
>>> preserved. So I want parallelism between channels, but not within.
>>>
>>> If I groupBy by channel, I can't control the degree of parallelism  and
>>> the destination is overwhelmed.
>>> Should I make an ActorSubscriber for the write, that backpressures
>>> (based on an integer value) and use alsoTo on the SubFlow?
>>>
>>> Any way to achieve this with standard components?
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Configurable parallelism/elastic groupBy

2016-07-21 Thread Richard Rodseth
Thanks. That gives me some terms to Google, but any further pointers
welcome. I see references to FlexiRoute being replaced by GraphStage.
Presumably that's what I need to study next?

On Thu, Jul 21, 2016 at 12:59 AM, Viktor Klang <viktor.kl...@gmail.com>
wrote:

> A routing stage with consistent hashing and then a N-Way merge to insert
> into db?
>
> --
> Cheers,
> √
>
> On Jul 20, 2016 9:09 PM, "Richard Rodseth" <rrods...@gmail.com> wrote:
>
>> I'm sure I've asked this before in numerous ways, but it's still an issue
>> for me.
>>
>> I have an ETL stream that reads per-channel data and writes it to a
>> destination without backpressure. Within a channel, order of writes must be
>> preserved. So I want parallelism between channels, but not within.
>>
>> If I groupBy by channel, I can't control the degree of parallelism  and
>> the destination is overwhelmed.
>> Should I make an ActorSubscriber for the write, that backpressures (based
>> on an integer value) and use alsoTo on the SubFlow?
>>
>> Any way to achieve this with standard components?
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Configurable parallelism/elastic groupBy

2016-07-20 Thread Richard Rodseth
I'm sure I've asked this before in numerous ways, but it's still an issue
for me.

I have an ETL stream that reads per-channel data and writes it to a
destination without backpressure. Within a channel, order of writes must be
preserved. So I want parallelism between channels, but not within.

If I groupBy by channel, I can't control the degree of parallelism  and the
destination is overwhelmed.
Should I make an ActorSubscriber for the write, that backpressures (based
on an integer value) and use alsoTo on the SubFlow?

Any way to achieve this with standard components?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Access to LoggingAdapter

2016-07-14 Thread Richard Rodseth
No, I'm just in a function creating a flow that includes
mapAsync(retry(somethingReturningFuture(..),...). Maybe I should just
create an adapter at the beginning of the program and pass it as a
parameter.

On Wed, Jul 13, 2016 at 11:38 PM, Akka Team <akka.offic...@gmail.com> wrote:

> If you are writing a custom graph stage, you can access it inside the
> GraphStageLogic from the ActorSystem like this:
>
> materializer match {
>   case mat: ActorMaterializer => mat.logger
>   case _ => throw new RuntimeException("this stage only works with an actor 
> materializer")
> }
>
> Note that this may make the stage impossible to use with any future
> alternative materializers.
>
> --
> Johan
>
> On Wed, Jul 13, 2016 at 9:54 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> Is there a way to get hold of the LoggingAdapter used by Akka streams?
>>
>> I'm making use of Viktor's Future retry
>>
>> https://gist.github.com/viktorklang/9414163
>>
>> and someone would like me to log something in the retry case. So I
>> thought I would add an implicit LoggingAdapter parameter and supply it at
>> the call site (which is a mapAsync call).
>>
>> Thanks
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Access to LoggingAdapter

2016-07-13 Thread Richard Rodseth
Is there a way to get hold of the LoggingAdapter used by Akka streams?

I'm making use of Viktor's Future retry

https://gist.github.com/viktorklang/9414163

and someone would like me to log something in the retry case. So I thought
I would add an implicit LoggingAdapter parameter and supply it at the call
site (which is a mapAsync call).

Thanks

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Actor stream to file

2016-06-23 Thread Richard Rodseth
I have a stream which needs to output some progress/error text to a file
and to console (different things to each).

I've implemented something using alsoTo(fileSink).alsoTo(consoleSink).

But now I'm considering funneling these messages through an actor
intermediary instead. i.e. alsoTo(outputActor)

Wondering how I'd set things up to make the output actor stream some of its
messages to a  file sink. Would it be an ActorPublisher that connected
itself to the file sink?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: DI and Testing Streams

2016-05-10 Thread Richard Rodseth
Not really, but thanks. My biggest challenge is that I have a composite
Source derived from multiple Slick 3 Sources.

eg.
val compositeSource = channelSource.flapMapConcat(ch => intervalSource(ch) )

For the moment I have a mockable trait with methods to construct those
sources.


On Tue, May 10, 2016 at 2:33 AM, Akka Team <akka.offic...@gmail.com> wrote:

> Hi Richard,
>
> In streams, the best way to inject dependencies is to make them separate
> stages/modules, where applicable. For example you can model a service that
> takes requests and provides some responses as a Flow. Now you can freely
> use it in tests with various test Sources/Sinks or probes. However, if you
> want to be able to stub out some underlying service this Flow depends on
> (maybe HTTP), you can still do it in various ways:
>
>  - if the low level service is something like A => Future[B] then you can
> just take it as a parameter when constructing the Flow
>  - if the low level service can be modeled as a Flow, then you can model
> your high-level service as a BidiFlow. in this case you can do something
> like:
>
> val lowLevelServiceStub: Flow[LowA, LowB] = ... //stub it
> val highLevelService: BidiFlow[A, LowA, LowB, B] = ... // service under
> test
> val testService: Flow[A, B] = highLevelService join lowLevelServiceStub
>
> I am not sure this is completely what you asked for though.
>
> -Endre
>
> On Fri, May 6, 2016 at 5:51 PM, Richard Rodseth <rrods...@gmail.com>
> wrote:
>
>> I just re-read:
>>
>> http://techblog.realestate.com.au/to-kill-a-mockingtest/
>>
>> It's hard for me to see how to apply this thinking to a side effect like
>> alsoTo() , used for monitoring in my application.
>> On the input side, to make "pure" a function that creates a flow that's
>> built of multiple sources (using flatMapConcat) I could imagine function
>> parameters rather than a DbSources trait, and test code that provides
>> different functions (rather that stubbing the trait). But I'm not sure that
>>
>> case class DbSources(f1: A=>Source[B, NotUsed], f2: X=>Source[Y,NotUsed])
>>
>> is any better than stubable:
>>
>> trait DbSources {
>>   def f1(a:A):Source[B,NotUsed]
>>   def f2(x:X):Source[Y,NotUsed]
>> }
>>
>> Anyone else wrestled with the same?
>>
>> On Wed, May 4, 2016 at 10:03 PM, Richard Rodseth <rrods...@gmail.com>
>> wrote:
>>
>>> I have some streams to test. Each one implements a particular "command".
>>> In recent days I have warmed up to using a combination of implicit
>>> parameters and constructor injection for DI.
>>>
>>>
>>> http://carefulescapades.blogspot.com/2012/05/using-implicit-parameters-for.html
>>>
>>> Some of my former singletons are now classes extending traits or
>>> abstract classes.
>>>
>>> But I have some remaining standalone functions (in Singletons) with
>>> rather long implicit parameter lists for their dependencies. eg:
>>>
>>> def command1(actualArgsForCommand...)(implicit ec: ExecutionContext, mat:
>>> ActorMaterializer, reader: SomeReader, writer: SomeWriter, monitoring:
>>> Monitoring):Future[SomeType]
>>>
>>> I can now mock SomeReader and SomeWriter, and use a test probe for the
>>> monitor actor (a dependency of MonitoringImpl). But the method signature is
>>> a bit ugly, even with the dependencies in their own parameter list.
>>>
>>> Is there some way to group the dependencies for a particular command in
>>> a case class or something while still having them "export" implicits, or
>>> would I have to redeclare within the method body?
>>> implicit val reader = groupedDeps.reader
>>> etc
>>>
>>> Or am I better off making a class per command (or related commands with
>>> the same dependencies), perhaps leaving the ec, and mat in method
>>> signatures, while converting the reader, writer and monitoring to class
>>> constructor arguments?
>>>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr

  1   2   3   >