Re: [akka-user] Akka cluster sharding performance issue [creating sharding actor very very slow]
Are you seeing this once, or are you seeing this slowness in steady-state? >From the log, this looks like it's the first call -- the shard doesn't exist, and the system is still setting up. That *can* take a while, yes: it's not just allocating one Actor, it's probably allocating several in a hierarchy. But once the system is in reasonably steady-state, in my experience, it's much faster than that. In general, sending one message isn't going to give you an accurate view of anything -- sending several thousand messages, to several hundred Actors, is going to give you a much more realistic understanding of how long things take... On Fri, Jul 14, 2017 at 12:22 AM, Chaohua Yuanwrote: > Hi, > > I am trying to build a backend system with akka cluster and cluster > sharding. Everything goes well except the performance issue when using akka > cluster sharding. > > It seems creating the actor with shard region actor is extremely slow(The > shard actor is created when call region actor tell.). Have any one met this > issue before or it is some thing the sharding works. > > > > My test actor in JAVA ... > > public class AuthService extends BaseService { > public AuthService(ActorRef serviceActor) { > super(serviceActor); > } > > @Override > public Receive createReceive() { > return receiveBuilder() > .match(AuthRequest.class, authRequest -> { > getContext().system().log().info("Get auth request from > client {}", authRequest.getToken()); > this.getServiceActor().tell(authRequest, getSender()); > }).build(); > } > > public static void config(ActorSystem system) { > ClusterSharding.get(system) > .start(AuthServiceActor.SHARD, > Props.create(AuthServiceActor.class, > AuthServiceActor::new), > ClusterShardingSettings.create(system), > AuthServiceActor.shardExtractor() > ); > > ActorRef actorRef = > ClusterSharding.get(system).shardRegion(AuthServiceActor.SHARD); > ActorRef authService = system.actorOf(Props.create(AuthService.class, > actorRef), "authService"); > ClusterClientReceptionist.get(system).registerService(authService); > } > } > > > > The sharding actor: > > > public class AuthServiceActor extends AbstractActor { > public static final String SHARD = "authService"; > > @Override > public void preStart() throws Exception { > super.preStart(); > getContext().system().log().info("Auth service start, {}", > getSelf().path()); > } > > @Override > public void postStop() throws Exception { > super.postStop(); > getContext().system().log().info("Auth service stop {}", > getSelf().path()); > } > > public Receive createReceive() { > return receiveBuilder() > .match(AuthRequest.class, authRequest -> { > getContext().system().log().info("Receive auth message > from user {}", authRequest.getToken()); > // No logic here, return back token as name and id. > String userId = authRequest.getToken(); > ActorRef sender = getSender(); > sender.tell(new AuthResponse(userId, userId), getSelf()); > getContext().system().log().info("Send back auth response > {}", userId); > }) > .build(); > > } > > public static ShardRegion.MessageExtractor shardExtractor() { > return new AuthServiceActor.AuthServiceShardMessageExtractor(); > } > > private static class AuthServiceShardMessageExtractor extends > ShardRegion.HashCodeMessageExtractor { > private static final int shardNumber = 64; > > AuthServiceShardMessageExtractor() { > super(shardNumber); > } > > @Override > public String entityId(Object o) { > if (o instanceof AuthRequest) { > return ((AuthRequest) o).getToken(); > } > return null; > } > } > } > > > User Actor > > > public class UserActor extends AbstractActor { > public final static String SHARD = "User"; > private ActorRef sessionActor; > > @Override > public void preStart() throws Exception { > super.preStart(); > > getContext().setReceiveTimeout(scala.concurrent.duration.Duration.create(300, > TimeUnit.SECONDS)); > getContext().system().log().info("User actor start {}", > self().path()); > } > > @Override > public void postStop() throws Exception { > super.postStop(); > getContext().system().log().info("User actor stop", self().path()); > } > > @Override > public Receive createReceive() { > return receiveBuilder().match(RegisterSession.class, registerSession > -> { > ActorRef
Re: [akka-user] Learn Akka From Samples & Examples
The getting started guide is a good place to get started ;) http://doc.akka.io/docs/akka/current/java/guide/introduction.html -- Johan Akka Team On Thu, Jul 13, 2017 at 7:02 PM, Bharath Kandaswamiwrote: > Hi Folks, > > I am a newbie to Akka, just wanted to explore and playaround with Akka a > little bit to see if I can use Akka in place of traditional Java > development tools, can someone show me the right direction to reach there. > > Any help is deeply appreciated. > > Kind Regards, > KK>Bharath. > > -- > >> Read the docs: http://akka.io/docs/ > >> Check the FAQ: http://doc.akka.io/docs/akka/ > current/additional/faq.html > >> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+unsubscr...@googlegroups.com. > To post to this group, send email to akka-user@googlegroups.com. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Alpakka: how to use the DynamoDB connector for large result sets
Hi, I have started to use the Alpakka DynamoDB connector and don't know how to deal with query pagination via LastEvaluatedKey. I managed to create an Akka Streams graph with a loop which run queries repeatedly, but unfortunately it doesn't stop, even when filtering for an empty LastEvaluatedKey. I managed to deal with my problem by using an iterator, but am wondering if there is a better way. Cheers, Jochen -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Need help understanding the warning on my console
At the end of each test or test suite, you stop the actor system that was running, usually using TestKit.shutdownActorSystem, this method takes a timeout, if it hasn't manage to terminate the actor system before this timeout the call fail with the error message you describe. All actors are stopped when before the ActorSystem shutdown completes. If there is an actor blocking or simply not completing the processing of a message (let's say it does a while(true) { ... } for example, this will hinder the shutdown. You can figure out exactly what in your code is blocking/hindering actor shutdown by taking a thread dump, you should be able to see your own logic in one of the stack traces hinting you towards what is wrong. -- Johan Akka Team On Thu, Jul 13, 2017 at 4:59 PM, usermdawrote: > Hello > > I started getting this warning with a test failure (intermittent) that I > have no clue what it means. Any help on what the warning indicates will be > helpful. Here is the warning > I need some help understanding this warning > > [WARN] [07/13/2017 09:35:53.465] [main] [ActorSystem(default)] Failed > to stop [default] within [10 seconds] > -> / LocalActorRefProvider$$anon$1 class > akka.actor.LocalActorRefProvider$Guardian > status=0 1 children >⌊-> system LocalActorRef class > akka.actor.LocalActorRefProvider$SystemGuardian > status=2 Terminating(UserRequest) >|toDie: Actor[akka://default/system/ > deadLetterListener#-47810243] >| Actor[akka://default/system/testActor1#-1858382714] >| Actor[akka://default/system/testActor2#-1013315176] >| Actor[akka://default/system/testActor3#-639523170] >⌊-> deadLetterListener RepointableActorRef null status=1 > terminated >⌊-> testActor1 RepointableActorRef null status=1 terminated >⌊-> testActor2 RepointableActorRef null status=1 terminated >⌊-> testActor3 RepointableActorRef class akka.testkit.TestActor > status=0 Terminating(UserRequest) >| |toDie: TestActor[akka://default/ > system/testActor3/1ae159bf-59a1-4899-884d-105705d1470b] >| ⌊-> 1ae159bf-59a1-4899-884d-105705d1470b TestActorRef > class myParentActor status=0 1 children >| ⌊-> $a LocalActorRef class myFirstChild status=2 no > children >⌊-> testActor4 RepointableActorRef class akka.testkit.TestActor > status=0 no children >⌊-> testActor5 RepointableActorRef class akka.testkit.TestActor > status=0 1 children >| ⌊-> dc2a094f-a12e-4221-8875-e400065b522f TestActorRef > class MyParentActor status=0 no children >⌊-> testActor6 RepointableActorRef class akka.testkit.TestActor > status=0 no children >⌊-> testActor7 RepointableActorRef class akka.testkit.TestActor > status=0 1 children >| ⌊-> 643d6fc5-bcd6-4c27-9da6-3f04c59bccfd TestActorRef > class MyParentActor status=0 no children >⌊-> testActor8 RepointableActorRef class akka.testkit.TestActor > status=0 no children > > > I have a test that is expecting a certain message after the myParentActor > has completed a task that is timing out. I am trying to figure out if they > are related > > > I am not sure what more info to provide > > -- > >> Read the docs: http://akka.io/docs/ > >> Check the FAQ: http://doc.akka.io/docs/akka/ > current/additional/faq.html > >> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+unsubscr...@googlegroups.com. > To post to this group, send email to akka-user@googlegroups.com. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: formField recent change
Likely because of the fix for this bug: https://github.com/akka/akka-http/issues/1134 Before 10.0.8 Akka HTTP would just blindly guess the charset UTF-8 even if the request did not specify a charset, while now it has a specific type representing a missing charset instead to give user server logic a chance decide what to do. -- Johan Akka Team On Tue, Jun 27, 2017 at 6:33 PM, Christophe Pachewrote: > The difference seams to come from that (using request logging): > HttpEntity.Strict(application/x-www-form-urlencoded; charset=UTF-8,foo=bar > ) > becomes > HttpEntity.Strict(application/x-www-form-urlencoded,ByteString(102, 111, > 111, 61, 98, 97, 114)) > > so is fixed using the charset. > > Le lundi 26 juin 2017 07:53:29 UTC+2, Christophe Pache a écrit : >> >> Hello everyone! >> >> I'm using forms in a akka-http app and I have an issue with >> `formFieldMap` directive: it start to fail after a akka update (akka-http >> 10.0.6 -> 10.0.8 and akka 2.5.1 -> 2.5.3). >> I feel like the FormData was not required before this change (as some of >> my test use also akka-http, was fine to just add an empty FormData and it >> solved the issue). >> >> >> My main question is: if I don't provide a form data in my request, would >> a directive formFieldMap be taken or not? What about the parameterMap which >> seams taken even if empty? >> >> I'm stuck currently trying to make test using basic HttpURLConnection work >> with the update (was working before). Have someone already tested >> formFieldMap/Seq without akka-http test support? Here is part of code I'm >> using to test: >> >> val conn: HttpURLConnection = createHttpConnection(urlEndpoint) >> conn.setRequestMethod("GET") >> conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded") >> val postDataBytes: Array[Byte] = "foo=bar".getBytes(*"UTF-8"*) >> conn.setRequestProperty("Content-Length", >> String.valueOf(postDataBytes.length)) >> conn.getOutputStream.write(postDataBytes) >> >> val response: Int = conn.getResponseCode >> val inputStream: InputStream = conn.getInputStream >> (readUtf8InputStream(inputStream), conn.getHeaderFields) >> >> >> >> >> Many thanks for any hint and have a nice day! >> Christophe >> > -- > >> Read the docs: http://akka.io/docs/ > >> Check the FAQ: http://doc.akka.io/docs/akka/ > current/additional/faq.html > >> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+unsubscr...@googlegroups.com. > To post to this group, send email to akka-user@googlegroups.com. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Elastic4s and Kafka
Hi, has anybody ever tried to use the Elasic4s reactive streaming extension with Kafka? I've found this https://github.com/yannick-cw/elastic-indexer4s where you only need akka.stream.scaladsl.Source[A] and a com.sksamuel.elastic4s.Indexable[A]. Is this yet the classical Source->Flow->Sink implementation or is the Elastic4s the sink and I need Kafka Reactive Streams as source connected via a flow? I haven't such a great experience here and my first solution was to use Apache Flink, but the reactive component is missing (I mean backpressure here). Thanks Torsten -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Akka Cluster Pub/Sub performance for chat-room like applications
Thank you! I've subscribed there to follow further discussions. On Thursday, July 13, 2017 at 4:02:24 PM UTC+3, johannes...@lightbend.com wrote: > > On Thursday, July 13, 2017 at 1:08:19 PM UTC+2, Alexander Lukyanchikov > wrote: >> >> *The only question, is it capable to manage tens of millions of topics? >> Would it perform better then our current solution?* >> > > No, most likely it currently won't scale up to 1 million active topics. In > Akka's pubsub, each node keeps a topic actor that manages subscriptions of > local actors to this topic. Then the information about which node is > interested in which topics is replicated across the whole cluster. > > We plan to test the actual memory consumption under realistic conditions > but he haven't got to that so far. > > I created a ticket to estimate the memory usage and to collect ideas about > how to optimize for bigger workloads: > > https://github.com/akka/akka/issues/23357 > > Johannes > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Akka cluster sharding performance issue [creating sharding actor very very slow]
Hi, I am trying to build a backend system with akka cluster and cluster sharding. Everything goes well except the performance issue when using akka cluster sharding. It seems creating the actor with shard region actor is extremely slow(The shard actor is created when call region actor tell.). Have any one met this issue before or it is some thing the sharding works. My test actor in JAVA ... public class AuthService extends BaseService { public AuthService(ActorRef serviceActor) { super(serviceActor); } @Override public Receive createReceive() { return receiveBuilder() .match(AuthRequest.class, authRequest -> { getContext().system().log().info("Get auth request from client {}", authRequest.getToken()); this.getServiceActor().tell(authRequest, getSender()); }).build(); } public static void config(ActorSystem system) { ClusterSharding.get(system) .start(AuthServiceActor.SHARD, Props.create(AuthServiceActor.class, AuthServiceActor::new), ClusterShardingSettings.create(system), AuthServiceActor.shardExtractor() ); ActorRef actorRef = ClusterSharding.get(system).shardRegion(AuthServiceActor.SHARD); ActorRef authService = system.actorOf(Props.create(AuthService.class, actorRef), "authService"); ClusterClientReceptionist.get(system).registerService(authService); } } The sharding actor: public class AuthServiceActor extends AbstractActor { public static final String SHARD = "authService"; @Override public void preStart() throws Exception { super.preStart(); getContext().system().log().info("Auth service start, {}", getSelf().path()); } @Override public void postStop() throws Exception { super.postStop(); getContext().system().log().info("Auth service stop {}", getSelf().path()); } public Receive createReceive() { return receiveBuilder() .match(AuthRequest.class, authRequest -> { getContext().system().log().info("Receive auth message from user {}", authRequest.getToken()); // No logic here, return back token as name and id. String userId = authRequest.getToken(); ActorRef sender = getSender(); sender.tell(new AuthResponse(userId, userId), getSelf()); getContext().system().log().info("Send back auth response {}", userId); }) .build(); } public static ShardRegion.MessageExtractor shardExtractor() { return new AuthServiceActor.AuthServiceShardMessageExtractor(); } private static class AuthServiceShardMessageExtractor extends ShardRegion.HashCodeMessageExtractor { private static final int shardNumber = 64; AuthServiceShardMessageExtractor() { super(shardNumber); } @Override public String entityId(Object o) { if (o instanceof AuthRequest) { return ((AuthRequest) o).getToken(); } return null; } } } User Actor public class UserActor extends AbstractActor { public final static String SHARD = "User"; private ActorRef sessionActor; @Override public void preStart() throws Exception { super.preStart(); getContext().setReceiveTimeout(scala.concurrent.duration.Duration.create(300, TimeUnit.SECONDS)); getContext().system().log().info("User actor start {}", self().path()); } @Override public void postStop() throws Exception { super.postStop(); getContext().system().log().info("User actor stop", self().path()); } @Override public Receive createReceive() { return receiveBuilder().match(RegisterSession.class, registerSession -> { ActorRef sender = getSender(); getContext().system().log().info("Send register success message to current session actor {}", sessionActor.path()); getSender().tell(true, getSelf()); }). public class UserService extends BaseService { public UserService(ActorRef serviceActor) { super(serviceActor); } public static void config(ActorSystem system) { ActorRef userActor = ClusterSharding.get(system) .start(UserActor.SHARD, Props.create(UserActor.class, UserActor::new), ClusterShardingSettings.create(system), UserActor.shardExtractor() ); ActorRef actorRef = ClusterSharding.get(system).shardRegion(UserActor.SHARD); ActorRef userService = system.actorOf(Props.create(UserService.class, actorRef), "userService");