Re: [akka-user] Exception when persisting the ShardCoordinator
Just an off the cuff guess: akka-persistence-jdbc does not provide a ConnectionPoolSettings when creating the connection pool, which means that the defaults will be provided, which does not contain a validation query, which means that the only way to test the validity of the connections to Oracle DB is via the isValid getter, which does not reset the idle timer of the connection, which leads to connections timing out. So if my guess is correct the fix is to add the proper validation query when creating the connection pool, and if memory serves me right the common query for this when it comes to Oracle DB is select 1 from dual. On Tue, Dec 23, 2014 at 11:24 PM, Miguel Vilá miguelvi...@gmail.com wrote: Thanks, Endre A colleague of mine already submitted an issue to them but we haven't received any response yet: https://github.com/dnvriend/akka-persistence-jdbc/issues/9 . Is it possible to have the coordinator's persistence use a different journal than the one used by our own persistent actors? El martes, 23 de diciembre de 2014 14:42:24 UTC-5, Akka Team escribió: Hi Miguel, This seems to be a journal issue. You should contact the maintainer of the JDBC journal. -Endre On Tue, Dec 23, 2014 at 7:43 PM, Miguel Vilá migue...@seven4n.com wrote: Hi all, I'm having a problem with akka-persistence and akka-sharding. Every now and then, sometimes after running our app for a long time I get this error: DELETE FROM USERSIS.snapshot WHERE persistence_id = '/user/sharding/ TokenRouterCoordinator/singleton/coordinator' AND sequence_nr = 2 [ERROR] [12/18/2014 13:51:28.826] [TokenCluster-akka.actor.default- dispatcher-16] [akka://TokenCluster/system/snapshot-store] No more data to read from socket java.sql.SQLRecoverableException: No more data to read from socket at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java: 1157) at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:350) at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:227) at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:531) at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedS tatement.java:208) at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPr eparedStatement.java:1046) at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(Orac leStatement.java:1336) at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(O raclePreparedStatement.java:3613) at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(Ora clePreparedStatement.java:3694) at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate( OraclePreparedStatementWrapper.java:1354) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate (DelegatingPreparedStatement.java:105) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate (DelegatingPreparedStatement.java:105) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply $mcI$sp(StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$NakedExecutor.apply(StatementE xecutor.scala:33) at scalikejdbc.StatementExecutor$$anon$1.scalikejdbc$ StatementExecutor$LoggingSQLAndTiming$$super$apply(StatementExecutor. scala:317) at scalikejdbc.StatementExecutor$LoggingSQLAndTiming$class.apply( StatementExecutor.scala:264) at scalikejdbc.StatementExecutor$$anon$1.scalikejdbc$ StatementExecutor$LoggingSQLIfFailed$$super$apply(StatementExecutor. scala:317) at scalikejdbc.StatementExecutor$LoggingSQLIfFailed$class.apply( StatementExecutor.scala:295) at scalikejdbc.StatementExecutor$$anon$1.apply(StatementExecutor. scala:317) at scalikejdbc.StatementExecutor.executeUpdate(StatementExecutor. scala:337) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply( DBSession.scala:352) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply( DBSession.scala:350) at scalikejdbc.LoanPattern$class.using(LoanPattern.scala:33) at scalikejdbc.ActiveSession.using(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:349 ) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:327 ) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.DBConnection$class.autoCommit(DBConnection.scala:183) at scalikejdbc.DB.autoCommit(DB.scala:75) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:218) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:217) at
Re: [akka-user] Cluster unreachable and a lot of cluster connections
Endre, could it be due to pending-to-send system message overflow? On Thu, Jan 22, 2015 at 11:45 AM, Johannes Berg jberg...@gmail.com wrote: Okay, I increased the load further and now I see the same problem again. It seems to just have gotten a bit better in that it doesn't happen as fast, but with enough load it happens. To re-iterate, I have Akka 2.3.9 on all (8) nodes and auto-down-unreachable-after = off on all nodes and I don't do any manual downing anywhere, still the leader log prints this: 2015-01-22 10:35:37 + - [INFO] - from Cluster(akka://system) in system-akka.actor.default-dispatcher-2 Cluster Node [akka.tcp://system@ip1:port1] - Leader is removing unreachable node [akka.tcp://system@ip2:port2] and the node(s) under load is(are) removed from the cluster (quarantined). How is this possible? On Wednesday, January 21, 2015 at 5:53:06 PM UTC+2, drewhk wrote: Hi Johannes, See the milestone here: https://github.com/akka/ akka/issues?q=milestone%3A2.3.9+is%3Aclosed The tickets cross reference the PRs, too, so you can look at the code changes. The issue that probably hit you is https://github.com/akka/ akka/issues/16623 which manifested as system message delivery errors on some systems, but actually was caused by accidentally duplicated internal actors (a regression). -Endre On Wed, Jan 21, 2015 at 4:47 PM, Johannes Berg jber...@gmail.com wrote: Upgrading to 2.3.9 does indeed seem to solve my problem. At least I haven't experienced them yet. Now I'm curious what the fixes were, is there somewhere a change summary between versions or where is it listed what bugs have been fixed in which versions? On Wednesday, January 21, 2015 at 11:31:02 AM UTC+2, drewhk wrote: Hi Johannes, We just released 2.3.9 with important bugfixes. I recommend to update and see if the problem is still persisting. -Endre On Wed, Jan 21, 2015 at 10:29 AM, Johannes Berg jber...@gmail.com wrote: Many connections seem to be formed in the case when the node has been marked down for unreachability even though it's still alive and it tries to connect back into the cluster. The removed node prints: Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted. It doesn't seem to close the connections properly even though it opens new ones continously. Anyway that's a separate issue that I'm not that concerned about right now, I've now realized I don't want to use automatic downing instead I would like to allow nodes to go unreachable and come back to reachable even if it takes quite some time and manually stopping the process and downing the node in case of an actual crash. Consequently I've put auto-down-unreachable-after = off in the config. Now I have the problem that nodes still are removed, this is from the leader node log: 08:50:14.087UTC INFO [system-akka.actor.default-dispatcher-4] Cluster(akka://system) - Cluster Node [akka.tcp://system@ip1:port1] - Leader is removing unreachable node [akka.tcp://system@ip2:port2] I can understand my node is marked unreachable beause it's under heavy load but I don't understand what could cause it to be removed. I'm not doing any manual downing and have the auto-down to off, what else could trigger the removal? Using the akka-cluster script I can see that the node has most other nodes marked as unreachable (including the leader) and that it has another leader than other nodes. My test system consists of 8 nodes. About the unreachability I'm not having long GC pauses and not sending large blobs, but I'm sending very many smaller messages as fast as I can. If I just hammer it fast enough it will end up unreachable which I can except, but I need to get it back to reachable. On Thursday, December 11, 2014 at 11:22:41 AM UTC+2, Björn Antonsson wrote: Hi Johannes, On 9 December 2014 at 15:29:53, Johannes Berg (jber...@gmail.com) wrote: Hi! I'm doing some load tests in our system and getting problems that some of my nodes are marked as unreachable even though the processes are up. I'm seeing it going a few times from reachable to unreachable and back a few times before staying unreachable saying connection gated for 5000ms and staying silently that way. Looking at the connections made to one of the seed nodes I see that I have several hundreds of connections from other nodes except the failing ones. Is this normal? There are several (hundreds) just between two nodes. When are connections formed between cluster nodes and when are they taken down? Several hundred connections between two nodes seems very wrong. There should only be one connection between two nodes that communicate over akka remoting or are part of a cluster. How many nodes do you have in your cluster? If you are using
Re: [akka-user] received Supervise from unregistered child ... this will not end well
Hi Marco, you'll need to update all Akka dependencies to the 2.3.9 version and make sure that your dependencies that depend on akka transitively are built for Akka 2.3.x On Thu, Jan 22, 2015 at 11:57 AM, Marco Luca Sbodio marco.sbo...@gmail.com wrote: Hi Viktor, after upgrading to akka 2.3.9 my multi-jvm-test crashes with this error: 10:49:07.654UTC ERROR [MySystemMultiNodeTest-akka.actor.default-dispatcher-21] [akka.actor.ActorSystemImpl] [ActorSystem(MySystemMultiNodeTest)] - Uncaught error from thread [MySystemMultiNodeTest-akka.actor.default-dispatcher-21] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled java.lang.AbstractMethodError: akka/actor/Actor.aroundPreStart()V at akka.actor.ActorCell.create(ActorCell.scala:580) ~[akka-actor_2.10-2.3.9.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.9.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.9.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) [akka-actor_2.10-2.3.9.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:220) [akka-actor_2.10-2.3.9.jar:na] at akka.dispatch.Mailbox.exec(Mailbox.scala:231) [akka-actor_2.10-2.3.9.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] Has something change with multi-jvm/multi-node testing from akka 2.2.4 to akka 2.3.9? Any clue? Thank you in advance, Marco On Wednesday, 21 January 2015 19:15:51 UTC, √ wrote: Hi Marco, Please upgrade to 2.3.9 if you haven't already, there was a couple of remoting-related issues fixed there. On Wed, Jan 21, 2015 at 6:45 PM, Marco Luca Sbodio marco@gmail.com wrote: I haven't. I've managed to figure out that sometimes the following code [[ int nextStepNumber = planSteps[0].getStepNumber(); Address nextAddress = planSteps[0].getPeer(). getAddress(); PlanStep[] nextPlanSteps = new PlanStep[planSteps.length]; System.arraycopy(planSteps, 0, nextPlanSteps, 0, planSteps.length); firstWorker = getContext().actorOf( Worker.mkProps(sink, nextPlanSteps).withDeploy(new Deploy(new RemoteScope(nextAddress))), Worker.actorName + nextStepNumber); firstWorker.tell(CommonApi.START, getSelf()); log.debug(started first worker); Chunk chunk = new Chunk(0, new byte[] {}); firstWorker.tell(chunk, getSelf()); log.debug(empty chunk sent to first worker); ]] generate such an error: [[ 17:20:08.195UTC ERROR [MySystemMultiNodeTest-akka.actor.default-dispatcher-16] [org.mysystem.actor.SubPlanner] [akka://MySystemMultiNodeTest/ user/the-engine/executor-6eb81c68-e91d-47cd-bb26- be53eee90f63/planner/sub-planner-11] - received Supervise from unregistered child Actor[akka.tcp://MySystemMulti nodet...@nd06.domain.com:3002/remote/akka.tcp/ mysystemmultinodet...@nd03.domain.com:3001/user/the- engine/executor-6eb81c68-e91d-47cd-bb26-be53eee90f63/ planner/sub-planner-11/worker0#-1680645824], this will not end well ]] tracing back sub-planner-11 in my logs I find the two messages log.debug(started first worker); firstWorker.tell(chunk, getSelf()); and then I get the error ... and I have no clue why Thank you in advance for any help/suggestion. Cheers, Marco On Tuesday, 20 January 2015 18:12:08 UTC, √ wrote: Have you closed over context.actorOf and execute it within a Future or similar? On Tue, Jan 20, 2015 at 6:47 PM, Marco Luca Sbodio marco@gmail.com wrote: Hello everyone, while testing my system I'm randomly getting error messages similar to the one in the subject of this topic. Here's an example: 17:27:59.265UTC ERROR [MySystemMultiNodeTest-akka.actor.default-dispatcher-16] [org.mysystem.actor.Worker] [akka://MySystemMultiNodeTest/ user/the-engine/executor-7b7690ee-f31d-45f1-93ef-79cba01fe60 4/planner/sub-planner-951/worker0] - received Supervise from unregistered child Actor[akka.tcp://MySystemMulti nodet...@nd06.domain.com:3002/remote/akka.tcp/MySystemMultiN odet...@nd03.domain.com:3001/user/the-engine/executor-7b7690ee-f31d- 45f1-93ef-79cba01fe604/planner/sub-planner-951/worker0/worker1.4# -430862452], this will not end well I really have no clue what might cause these errors, and what the consequences are (this will not end well). I've tried searching on the Web, but didn't find anything that
Re: [akka-user] Does akka support fiber/coroutine? (do not modify source code)
Hi Chansey, What would you look to gain by switching to coroutines? On Fri, Feb 6, 2015 at 7:44 PM, chanse...@gmail.com wrote: Hello, Does akka support fiber/coroutine? I found that all akka's dispatchers rely on JVM threads now. Is there any way to config dispatchers which could run actor on fiber/coroutine? (do not modify the akka source code) Like StubFiber in retLang which use ExecuteAllPendingUntilEmpty to run dispatcher schedule instead of actual JVM thread. PS: I maybe could make akka support it, but need modify some akka code... I am using akka.NET now. Very thanks. Br, Chansey -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Does akka support fiber/coroutine? (do not modify source code)
On Fri, Feb 6, 2015 at 8:24 PM, chanse...@gmail.com wrote: Hi Because my application is running on one thread now. (It maybe extend to multi-thread in the future) To be frank, I get benefit from akka by using actor model programming paradigm instead of its concurrency. You can use an Akka Dispatcher with a single thread and then make it non-daemonic and exit your main thread and your Akka application will be single-threaded? Br, Chansey 在 2015年2月7日星期六 UTC+8上午2:58:10,√写道: Hi Chansey, What would you look to gain by switching to coroutines? On Fri, Feb 6, 2015 at 7:44 PM, chan...@gmail.com wrote: Hello, Does akka support fiber/coroutine? I found that all akka's dispatchers rely on JVM threads now. Is there any way to config dispatchers which could run actor on fiber/coroutine? (do not modify the akka source code) Like StubFiber in retLang which use ExecuteAllPendingUntilEmpty to run dispatcher schedule instead of actual JVM thread. PS: I maybe could make akka support it, but need modify some akka code... I am using akka.NET now. Very thanks. Br, Chansey -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka.io downloads - no java?
Hi Jeffrey, Akka is built in Scala but offers Java APIs, so the Scala downloads should be just fine. However, if you are getting started I'd recommend the Activator download as you can get started with the Akka Java tutorials. On Sun, Feb 8, 2015 at 9:32 PM, Jeffrey Kelly derthn...@gmail.com wrote: I feel pretty stupid for asking this, but if I wanted to experiment with Akka for Java, where would I find it? For all the Java/Scala parallel documentation, the downloads section of the site only appears to provide Scala options (along with an entire ecosystem that I'm not interested in). I was kind of just hoping for a jar or 10. Am I missing something? Is Java Akka still a thing? jef -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: [akka-http 1.0-M2] Trying to add backpressure
The best kids of bugs are the non-bugs :) On Tue, Feb 3, 2015 at 9:06 PM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: As usual... after asking a question, you find the answer yourself. The tool I'm using, was keeping the same connection alive; therefore all requests from that connection were handle by the same branch. El martes, 3 de febrero de 2015, 19:57:04 (UTC), Luis Ángel Vicente Sánchez escribió: Dear hakkers, I have been trying to add back pressure to an akka-http application using a transformation Stage and I have found something... unexpected. I have created a SafetyStage that buffers up to a maximum number of elements: https://github.com/lvicentesanchez/random/blob/master/src/main/scala/io/ github/lvicentesanchez/streams/stage/SafetyStage.scala I have also simulated a random failure with a simple random number generator. Instead of handling akka-http connections using the method handleWith of the ServerBinding object, I'm creating the following FlowGraph: https://github.com/lvicentesanchez/random/blob/master/src/main/scala/io/ github/lvicentesanchez/Boot.scala#L56 The connection Source is transformed using the SafetyStage. If the maximum capacity, or a random error happens, a Left object is created; if not, a Right object is created. A FlexiRoute sends the Left requests to a sink that handles every failed/discarded connnection (it completes them with 503 error); Right requests go to a sink that handles them using an akka-http router. In my tests, once one of the Sink handles the first IncomingConnnection, all future connections are handled using the handler of that Sink. I.e. if the random number generator decided that the first connection suffered an error, all future connections would also be completed with a 503 error. I guess what I'm trying to do is fundamentally wrong, but I still don't understand why the transformation stage is completely ignored once the first connection is handled. Regards, Luis -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: [akka-http 1.0-M2] Trying to add backpressure
The best typo as well. s/kids/kinds On Tue, Feb 3, 2015 at 9:16 PM, Viktor Klang viktor.kl...@gmail.com wrote: The best kids of bugs are the non-bugs :) On Tue, Feb 3, 2015 at 9:06 PM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: As usual... after asking a question, you find the answer yourself. The tool I'm using, was keeping the same connection alive; therefore all requests from that connection were handle by the same branch. El martes, 3 de febrero de 2015, 19:57:04 (UTC), Luis Ángel Vicente Sánchez escribió: Dear hakkers, I have been trying to add back pressure to an akka-http application using a transformation Stage and I have found something... unexpected. I have created a SafetyStage that buffers up to a maximum number of elements: https://github.com/lvicentesanchez/random/blob/master/src/main/scala/io/ github/lvicentesanchez/streams/stage/SafetyStage.scala I have also simulated a random failure with a simple random number generator. Instead of handling akka-http connections using the method handleWith of the ServerBinding object, I'm creating the following FlowGraph: https://github.com/lvicentesanchez/random/blob/master/src/main/scala/io/ github/lvicentesanchez/Boot.scala#L56 The connection Source is transformed using the SafetyStage. If the maximum capacity, or a random error happens, a Left object is created; if not, a Right object is created. A FlexiRoute sends the Left requests to a sink that handles every failed/discarded connnection (it completes them with 503 error); Right requests go to a sink that handles them using an akka-http router. In my tests, once one of the Sink handles the first IncomingConnnection, all future connections are handled using the handler of that Sink. I.e. if the random number generator decided that the first connection suffered an error, all future connections would also be completed with a 503 error. I guess what I'm trying to do is fundamentally wrong, but I still don't understand why the transformation stage is completely ignored once the first connection is handled. Regards, Luis -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] WartRemover error on Akka receive methods
One idea: def receive = { case = ... } : Receive (On my phone so may not work) -- Cheers, √ On 15 Jan 2015 23:06, javierg javierg1...@gmail.com wrote: Hi Konrad, Adding the type signature doesn't seem to have any effect *Error:(31, 26) Inferred type containing Any* * def receive: Receive = {* * ^* I'm aware that WartRemover can be configured. I could set this particular wart to be reported as a *warning* and not an *error* but that will also make any other (valid) instances to marked as such and it's my experience that warnings get, more often than not, swept under the rug (something I was trying to avoid.) Still, like you mentioned, there doesn't seem to be a solution for this at the moment. I just wanted to be sure I wasn't missing something obvious. Many thanks for the prompt reply. On Thursday, January 15, 2015 at 4:32:47 PM UTC-5, Konrad Malawski wrote: Hello there, As I understand it, wart-remover is configurable to which warts” it should be reporting. In the case of Actor.Receive it’s not happy because it is an alias to Any = Unit. Without a large philosophical dive why it is such and not a different signature (and btw. Roland will soon soon get a new impl ot akka.typed out for preview ;-)), let’s address your problem and question at hand. Two solutions come to mind: 1) Since the wart is triggered for “inferred type contains Any”, you can write the type explicitly (def receive: Receive = …) instead of it being inferred I assume? (Did not try that though) 2) As seen on: https://github.com/puffnfresh/wartremover these warts can be enabled / disabled at will. warts can be configured and in your case you’d like to keep all “except inferred type contains Any”, so you could use: wartremoverErrors := Warts.allBut(Wart.Any) which should make it happy on receive methods. Hope this helps! Disclaimer: I did not try this, but it seems to all logically fall into place :-) -- Konrad 'ktoso’ Malawski Akka http://akka.io @ Typesafe http://typesafe.com On 15 January 2015 at 22:24:00, javierg (javie...@gmail.com) wrote: Hi all, Apologies for the more than slightly offtopic question. I recently started using WartRemover and one of the first things that I encountered is a barrage of error notifications like what follows (for, as long as I can see, every receive method in my codebase) *Error:(31, 7) Inferred type containing Any* * def receive = {* * ^* Before WartRemover this code used to compile (and run) without issues. WartRemover claims it doesn't report false positives, but I'm getting this error on cases as simple and trivial as the following (admittedly contrived) example *def receive = {* *case a:String = log.info http://log.info(a)* *case _ = log.info http://log.info(Unexpected input)* *} * Is there a way to solve this (other than asking WartRemover to not report this)? Thanks in advance, Javier -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] received Supervise from unregistered child ... this will not end well
Have you closed over context.actorOf and execute it within a Future or similar? On Tue, Jan 20, 2015 at 6:47 PM, Marco Luca Sbodio marco.sbo...@gmail.com wrote: Hello everyone, while testing my system I'm randomly getting error messages similar to the one in the subject of this topic. Here's an example: 17:27:59.265UTC ERROR [MySystemMultiNodeTest-akka.actor.default-dispatcher-16] [org.mysystem.actor.Worker] [akka://MySystemMultiNodeTest/user/the-engine/executor-7b7690ee-f31d-45f1-93ef-79cba01fe604/planner/sub-planner-951/worker0] - received Supervise from unregistered child Actor[akka.tcp:// mysystemmultinodet...@nd06.domain.com:3002/remote/akka.tcp/mysystemmultinodet...@nd03.domain.com:3001/user/the-engine/executor-7b7690ee-f31d-45f1-93ef-79cba01fe604/planner/sub-planner-951/worker0/worker1.4#-430862452], this will not end well I really have no clue what might cause these errors, and what the consequences are (this will not end well). I've tried searching on the Web, but didn't find anything that helped me. I'm using akka 2.2.3 Any help is highly appreciated! Thanks, Marco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] received Supervise from unregistered child ... this will not end well
Hi Marco, Please upgrade to 2.3.9 if you haven't already, there was a couple of remoting-related issues fixed there. On Wed, Jan 21, 2015 at 6:45 PM, Marco Luca Sbodio marco.sbo...@gmail.com wrote: I haven't. I've managed to figure out that sometimes the following code [[ int nextStepNumber = planSteps[0].getStepNumber(); Address nextAddress = planSteps[0].getPeer().getAddress(); PlanStep[] nextPlanSteps = new PlanStep[planSteps.length]; System.arraycopy(planSteps, 0, nextPlanSteps, 0, planSteps.length); firstWorker = getContext().actorOf( Worker.mkProps(sink, nextPlanSteps).withDeploy(new Deploy(new RemoteScope(nextAddress))), Worker.actorName + nextStepNumber); firstWorker.tell(CommonApi.START, getSelf()); log.debug(started first worker); Chunk chunk = new Chunk(0, new byte[] {}); firstWorker.tell(chunk, getSelf()); log.debug(empty chunk sent to first worker); ]] generate such an error: [[ 17:20:08.195UTC ERROR [MySystemMultiNodeTest-akka.actor.default-dispatcher-16] [org.mysystem.actor.SubPlanner] [akka://MySystemMultiNodeTest/user/the-engine/executor-6eb81c68-e91d-47cd-bb26-be53eee90f63/planner/sub-planner-11] - received Supervise from unregistered child Actor[akka.tcp:// mysystemmultinodet...@nd06.domain.com:3002/remote/akka.tcp/mysystemmultinodet...@nd03.domain.com:3001/user/the-engine/executor-6eb81c68-e91d-47cd-bb26-be53eee90f63/planner/sub-planner-11/worker0#-1680645824], this will not end well ]] tracing back sub-planner-11 in my logs I find the two messages log.debug(started first worker); firstWorker.tell(chunk, getSelf()); and then I get the error ... and I have no clue why Thank you in advance for any help/suggestion. Cheers, Marco On Tuesday, 20 January 2015 18:12:08 UTC, √ wrote: Have you closed over context.actorOf and execute it within a Future or similar? On Tue, Jan 20, 2015 at 6:47 PM, Marco Luca Sbodio marco@gmail.com wrote: Hello everyone, while testing my system I'm randomly getting error messages similar to the one in the subject of this topic. Here's an example: 17:27:59.265UTC ERROR [MySystemMultiNodeTest-akka.actor.default-dispatcher-16] [org.mysystem.actor.Worker] [akka://MySystemMultiNodeTest/ user/the-engine/executor-7b7690ee-f31d-45f1-93ef- 79cba01fe604/planner/sub-planner-951/worker0] - received Supervise from unregistered child Actor[akka.tcp://MySystemMulti nodet...@nd06.domain.com:3002/remote/akka.tcp/ mysystemmultinodet...@nd03.domain.com:3001/user/the- engine/executor-7b7690ee-f31d-45f1-93ef-79cba01fe604/ planner/sub-planner-951/worker0/worker1.4#-430862452], this will not end well I really have no clue what might cause these errors, and what the consequences are (this will not end well). I've tried searching on the Web, but didn't find anything that helped me. I'm using akka 2.2.3 Any help is highly appreciated! Thanks, Marco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Cluster unreachable and a lot of cluster connections
Hi Johannes, see the news item: http://akka.io/news/2015/01/19/akka-2.3.9-released.html On Wed, Jan 21, 2015 at 4:47 PM, Johannes Berg jberg...@gmail.com wrote: Upgrading to 2.3.9 does indeed seem to solve my problem. At least I haven't experienced them yet. Now I'm curious what the fixes were, is there somewhere a change summary between versions or where is it listed what bugs have been fixed in which versions? On Wednesday, January 21, 2015 at 11:31:02 AM UTC+2, drewhk wrote: Hi Johannes, We just released 2.3.9 with important bugfixes. I recommend to update and see if the problem is still persisting. -Endre On Wed, Jan 21, 2015 at 10:29 AM, Johannes Berg jber...@gmail.com wrote: Many connections seem to be formed in the case when the node has been marked down for unreachability even though it's still alive and it tries to connect back into the cluster. The removed node prints: Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted. It doesn't seem to close the connections properly even though it opens new ones continously. Anyway that's a separate issue that I'm not that concerned about right now, I've now realized I don't want to use automatic downing instead I would like to allow nodes to go unreachable and come back to reachable even if it takes quite some time and manually stopping the process and downing the node in case of an actual crash. Consequently I've put auto-down-unreachable-after = off in the config. Now I have the problem that nodes still are removed, this is from the leader node log: 08:50:14.087UTC INFO [system-akka.actor.default-dispatcher-4] Cluster(akka://system) - Cluster Node [akka.tcp://system@ip1:port1] - Leader is removing unreachable node [akka.tcp://system@ip2:port2] I can understand my node is marked unreachable beause it's under heavy load but I don't understand what could cause it to be removed. I'm not doing any manual downing and have the auto-down to off, what else could trigger the removal? Using the akka-cluster script I can see that the node has most other nodes marked as unreachable (including the leader) and that it has another leader than other nodes. My test system consists of 8 nodes. About the unreachability I'm not having long GC pauses and not sending large blobs, but I'm sending very many smaller messages as fast as I can. If I just hammer it fast enough it will end up unreachable which I can except, but I need to get it back to reachable. On Thursday, December 11, 2014 at 11:22:41 AM UTC+2, Björn Antonsson wrote: Hi Johannes, On 9 December 2014 at 15:29:53, Johannes Berg (jber...@gmail.com) wrote: Hi! I'm doing some load tests in our system and getting problems that some of my nodes are marked as unreachable even though the processes are up. I'm seeing it going a few times from reachable to unreachable and back a few times before staying unreachable saying connection gated for 5000ms and staying silently that way. Looking at the connections made to one of the seed nodes I see that I have several hundreds of connections from other nodes except the failing ones. Is this normal? There are several (hundreds) just between two nodes. When are connections formed between cluster nodes and when are they taken down? Several hundred connections between two nodes seems very wrong. There should only be one connection between two nodes that communicate over akka remoting or are part of a cluster. How many nodes do you have in your cluster? If you are using cluster aware routers then there should be one connection between the router node and the rooutee nodes (can be the same connection that is used for the cluster communication). The connections between the nodes don't get torn down, they stay open, but they are reused for all remoting communication between the nodes. Also is there some limit on how many connections a node with default settings will accept? We have auto-down-unreachable-after = 10s set in our config, does this mean if the node is busy and doesn't respond in 10 seconds it becomes unreachable? Is there any reason why it would stay unreachable and not re-try to join the cluster? The auto down, setting is actually just what it says. I the node is considered unreachable for 10 seconds, it will be moved to DOWN and won't be able to come back into the cluster. The different states of the cluster and the settings are explained in the documentation. http://doc.akka.io/docs/akka/2.3.7/common/cluster.html http://doc.akka.io/docs/akka/2.3.7/scala/cluster-usage.html If you are having problems with nodes becoming unreachable then you could check if you are doing one of these things: 1) sending to large blobs as messages, that effectively block out the heart beats
Re: [akka-user] Re: Dispatcher assignment issue for Http Server
Could you put together a minimized reproducer? On Tue, Jan 20, 2015 at 5:21 PM, Randy Fox randy@connexity.com wrote: Nope. Did I find a bug or am i doing something wrong? On Saturday, January 17, 2015 at 1:21:12 PM UTC-8, √ wrote: Alright. Did you manage to sort it out? -- Cheers, √ On 16 Jan 2015 16:56, Randy Fox rand...@connexity.com wrote: I expected the thread pool to be pegged at 100 threads, but watching in visual vm showed 24 threads (default). Poking around the object structure in the debugger, I found nested deep under the flowmaterializer was a maxsize of 64 (also default). -r On Thursday, January 15, 2015 at 5:34:05 PM UTC-8, Randy Fox wrote: I am trying to assign a dispatcher to be used for my http server (using the new Akka HTTP Server) and can’t get it to honor my pool size settings. Logs show it is using the dispatcher, but visualvm shows it is not using the core pool size settings. Loos like it might be using the defaults for a thread-pool-executor. All I did was modify the example: 1. import akka.http.Http 2. import akka.stream.FlowMaterializer 3. 4. implicit val system = ActorSystem() 5. implicit val materializer = FlowMaterializer( *MaterializerSettings*(*system*).withDispatcher( *“myhttpRequestHandler.dispatcher*)) 6. 7. val serverBinding = Http(system).bind(interface = localhost, port = 8080) 8. serverBinding.connections.foreach { connection = // foreach materializes the source 9. println(Accepted new connection from + connection.remoteAddress) 10. } 11. … myhttpRequestHandler { dispatcher { type = Dispatcher executor = *thread-pool-executor* name = httprequesthandler-dispatcher thread-pool-executor { core-pool-size-min = 100 core-pool-size-factor = 2.0 core-pool-size-max = 100 } throughput = 5 } } [INFO] [2015-01-15 17:24:27,516] [DUDE-myhttpRequestHandler.dispatcher-79] HttpRequestHandler(akka://DUDE): Accepted new connection from / 127.0.0.1:54046 What am I missing? Thanks, Randy Fox -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Dispatcher assignment issue for Http Server
Alright. Did you manage to sort it out? -- Cheers, √ On 16 Jan 2015 16:56, Randy Fox randy@connexity.com wrote: I expected the thread pool to be pegged at 100 threads, but watching in visual vm showed 24 threads (default). Poking around the object structure in the debugger, I found nested deep under the flowmaterializer was a maxsize of 64 (also default). -r On Thursday, January 15, 2015 at 5:34:05 PM UTC-8, Randy Fox wrote: I am trying to assign a dispatcher to be used for my http server (using the new Akka HTTP Server) and can’t get it to honor my pool size settings. Logs show it is using the dispatcher, but visualvm shows it is not using the core pool size settings. Loos like it might be using the defaults for a thread-pool-executor. All I did was modify the example: 1. import akka.http.Http 2. import akka.stream.FlowMaterializer 3. 4. implicit val system = ActorSystem() 5. implicit val materializer = FlowMaterializer(*MaterializerSettings* (*system*).withDispatcher(*“myhttpRequestHandler.dispatcher*)) 6. 7. val serverBinding = Http(system).bind(interface = localhost, port = 8080) 8. serverBinding.connections.foreach { connection = // foreach materializes the source 9. println(Accepted new connection from + connection.remoteAddress) 10. } 11. … myhttpRequestHandler { dispatcher { type = Dispatcher executor = *thread-pool-executor* name = httprequesthandler-dispatcher thread-pool-executor { core-pool-size-min = 100 core-pool-size-factor = 2.0 core-pool-size-max = 100 } throughput = 5 } } [INFO] [2015-01-15 17:24:27,516] [DUDE-myhttpRequestHandler.dispatcher-79] HttpRequestHandler(akka://DUDE): Accepted new connection from / 127.0.0.1:54046 What am I missing? Thanks, Randy Fox -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Dispatcher assignment issue for Http Server
Hi Randy, What makes you say it doesn't respect your core pool size? -- Cheers, √ On 16 Jan 2015 02:34, Randy Fox randy@connexity.com wrote: I am trying to assign a dispatcher to be used for my http server (using the new Akka HTTP Server) and can’t get it to honor my pool size settings. Logs show it is using the dispatcher, but visualvm shows it is not using the core pool size settings. Loos like it might be using the defaults for a thread-pool-executor. All I did was modify the example: 1. import akka.http.Http 2. import akka.stream.FlowMaterializer 3. 4. implicit val system = ActorSystem() 5. implicit val materializer = FlowMaterializer(*MaterializerSettings*( *system*).withDispatcher(*“myhttpRequestHandler.dispatcher*)) 6. 7. val serverBinding = Http(system).bind(interface = localhost, port = 8080) 8. serverBinding.connections.foreach { connection = // foreach materializes the source 9. println(Accepted new connection from + connection.remoteAddress) 10. } 11. … myhttpRequestHandler { dispatcher { type = Dispatcher executor = *thread-pool-executor* name = httprequesthandler-dispatcher thread-pool-executor { core-pool-size-min = 100 core-pool-size-factor = 2.0 core-pool-size-max = 100 } throughput = 5 } } [INFO] [2015-01-15 17:24:27,516] [DUDE-myhttpRequestHandler.dispatcher-79] HttpRequestHandler(akka://DUDE): Accepted new connection from / 127.0.0.1:54046 What am I missing? Thanks, Randy Fox -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Handling of Fatal errors
Hi Andrey, if NCDFEs are fine in your app you can always wrap and rethrow to signal that you think it's fine. An Error is a subclass of Throwable that indicates serious problems that a reasonable application should not try to catch. Most such errors are abnormal conditions. The ThreadDeath error, though a normal condition, is also a subclass of Error because most applications should not try to catch it. A method is not required to declare in its throws clause any subclasses of Error that might be thrown during the execution of the method but not caught, since these errors are abnormal conditions that should never occur. - https://docs.oracle.com/javase/6/docs/api/java/lang/Error.html We should most definitely improve the documentation on this, as you note. On Wed, Feb 11, 2015 at 4:53 PM, Andrey andrewkor...@hotmail.com wrote: Hi Viktor, Thank you for your reply! My issue at the moment is not so much lack of any type of guarantees while processing a fatal exception (which indeed can be improved), but the fact that - as the unit test demonstrates - Akka is unable to shut down itself cleanly. Besides, fatal exceptions like NoClassDefFoundError do not make JVM unstable or inconsistent like some others may (the VirtualMachineError subclasses, for example), so in my mind there is no reason for Akka not to try its best and shutdown cleanly under any conditions. Regards Andrey On Wednesday, February 11, 2015 at 4:53:46 AM UTC-8, √ wrote: Hi Andrey, Thanks for your email, I've attempted to elucidate the current semantics but there may definitely be room for improvements. On 10 Feb 2015 17:23, Andrey andrew...@hotmail.com wrote: Hello, I have Akka configured not to exit JVM on fatal errors by setting the akka.jvm-exit-on-fatal-error property to off. In such case, it's expected to shutdown the actor system. As a side note, I find Scala's opinion as to which java.lang.Error are to be treated as fatal and non-fatal to be mildly peculiar. One example would be the OSGi environment where NoClassDefFoundError is not a fatal exception, but on the contrary, is very much recoverable without a JVM restart. Scala disagrees. it is only the catchall (NonFatal) that considers it fatal, if you expect it [NCDFE] in your code, you can/should surround it with a try catch and promote it to a non fatal exception or otherwise deal with the normal case. Generic NCDFEs are definitely not recoverable as they typically signal that the classpath/application is broken. Anyway, back to the problem, Akka seems to recognize the setting correctly and doesn't halt the JVM process. However, when later I call a shutdown() followed by an awaitTermination() on the instance of the actor system whose actor has just fatally failed, the awaitTermination() call never returns (blocks forever). Also, neither the actor that has thrown the Error, nor its parents (up the supervision tree) get called postStop(). It looks almost like if Akka forgets to stop the failed actor and its immediate parent is left waiting forever unable to proceed with its own shutdown. In the face of a fatal error no guarantees can be made as to the consistency of the JVM nor the data within it, as such we recommend to forcibly shut down the JVM. Please note that the siblings of the failed actor (as well as all other unrelated actors) do get their postStop() invoked. Since `postStop` is invoked in the -instance- that produced a fatal failure I guess there's the philosophical argument to be had whether it can attempt to stop itself or not. Although I could not find anything relevant in Akka documentation, my expectation is that postStop() of all actors is always invoked during actor system shutdown. That is not a guarantee that can be made nor kept on the JVM as thread interruption is cooperative and thread stop is undefined. Below is a simple unit test that demonstrates the problem. Thanks you for your help. Andrey package example; import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import com.google.common.util.concurrent.SettableFuture; import com.typesafe.config.ConfigFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; import scala.concurrent.duration.Duration; public class AkkaUtilTest { ActorSystem akka; @Before public void setUp() throws Exception { akka = ActorSystem.create(test, ConfigFactory.parseString( akka.jvm-exit-on-fatal-error=off)); } @After public void tearDown() throws Exception { akka.shutdown(); // This call never returns. akka.awaitTermination(Duration.create(5000, TimeUnit.MILLISECONDS)); } @Test(timeout = 2000) public void testError() throws Exception { SettableFuture? result =
Re: [akka-user] [Akka-2.3.7][Scala] - akka.remote.EndpointWriter
Hi Yarden, please upgrade to the latest bugfix release for the given major version (2.3.9 for 2.3 for instance). http://akka.io/news/2015/01/19/akka-2.3.9-released.html On Tue, Jan 27, 2015 at 8:50 AM, Yarden Bar ayash.jor...@gmail.com wrote: Hi All, I recently upgraded my application Akka version from 2.2.4 to 2.3.7. Since then I started seeing messages like this: akka.remote.EndpointWriter Drained buffer with maxWriteCount: 50, fullBackoffCount: 1, smallBackoffCount: 1, noBackoffCount: 0 , adaptiveBackoff: 2000 Google-ing about this subject sent me to a couple of articles in chineese. Can anyone shed some light on this EndpointWriter or the scenarios for getting the above message? Thanks, Yarden -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Simple, local, garbage collected actors
Hi Arek, We used to do it like that about 5 years ago: https://github.com/akka/akka/blob/v0.8.1/akka-core/src/main/scala/actor/Actor.scala#L105 :) On Tue, Jan 27, 2015 at 10:07 AM, Arkadiusz Burdach arek.burd...@gmail.com wrote: Hi all. What do you think about an idea of module providing possibility to create actor by *new* operator eg. val actor = new SimpleActor { override def receive: Receive = { case _ = sender() ! OK } } Those actors would be garbage collected so there will be no need to send PoisonPIll them. I've prepared proof of concept: https://github.com/arkadius/akka-simple It will be usefull? Or maybe there is already other way to use this kind of syntax of actors? Arek -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Basic Akka Configuration and parallelism
Hi all, if you want a router, then you need to create a router: 1. ActorRef actorRef = 2. actorSystem.actorOf(FromConfig.getInstance().props(Props.create( MyActor.class)), 3. myActor); On Wed, Jan 28, 2015 at 3:24 PM, Björn Antonsson bjorn.antons...@typesafe.com wrote: Hi Cos, Your single actor will process one single message at a time, in sequence. That is a fundamental principle of actors. If you want to have things happening in parallel then you need to have mutliple actors. B/ On 28 January 2015 at 15:21:14, Cosmin Marginean (cosmin...@gmail.com) wrote: We're running Akka 2.3.7 (Java) and I'm now working on reconfiguring to scale for more throughput (as it apparently isn't happening). I am struggling with the Akka documentation as well as some of the examples out there as there is always some contextual information that seems to be missing. However, in order to make sure I'm not going mad, I've extracted the code and config in a dead-simple unit test (I even removed akka's JavaTestKit). The code and config here suggest that there should be a lot of messages processed in parallel, however the entire processing is totally serialised and I can't understand where have I failed in this setup. Any suggestion would be helpful. Thank you Cos @Test public void testAkka() throws Exception { Config cfg = ConfigFactory.load(test-akka.conf); ActorSystem actorSystem = ActorSystem.create(main, cfg.getConfig(main)); ActorRef actorRef = actorSystem.actorOf(Props.create(MyActor.class), myactor); for (int i = 0; i 150; i++) { actorRef.tell(nothing, ActorRef.noSender()); } Thread.sleep(100); } public static final class MyActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { System.out.println(Doing stuff); Thread.sleep(2000); } } And the test-akka.conf file main { app-dispatcher { type = Dispatcher executor = fork-join-executor fork-join-executor { parallelism-min = 16 parallelism-factor = 32.0 parallelism-max = 512 } throughput = 1 } akka.actor.deployment { /myactor { dispatcher = app-dispatcher router = round-robin-pool nr-of-instances = 16 } } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe http://typesafe.com/ – Reactive Apps on the JVM twitter: @bantonsson http://twitter.com/#!/bantonsson -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Basic Akka Configuration and parallelism
Yes, we chose it to be non transparent to use routers or not since it fundamentally changes semantics. -- Cheers, √ On 28 Jan 2015 16:09, Cosmin Marginean cosmin...@gmail.com wrote: Right, I got it now. So that needs to be explicit about using a router in the code using the actor. So this is not entirely enough on its own /myactor { ... router = round-robin-pool Thanks, at least I know that now. Cos On Wednesday, 28 January 2015 14:27:10 UTC, √ wrote: Hi all, if you want a router, then you need to create a router: 1. ActorRef actorRef = 2. actorSystem.actorOf(FromConfig.getInstance().props(Props.create( MyActor.class)), 3. myActor); On Wed, Jan 28, 2015 at 3:24 PM, Björn Antonsson bjorn.a...@typesafe.com wrote: Hi Cos, Your single actor will process one single message at a time, in sequence. That is a fundamental principle of actors. If you want to have things happening in parallel then you need to have mutliple actors. B/ On 28 January 2015 at 15:21:14, Cosmin Marginean (cosm...@gmail.com) wrote: We're running Akka 2.3.7 (Java) and I'm now working on reconfiguring to scale for more throughput (as it apparently isn't happening). I am struggling with the Akka documentation as well as some of the examples out there as there is always some contextual information that seems to be missing. However, in order to make sure I'm not going mad, I've extracted the code and config in a dead-simple unit test (I even removed akka's JavaTestKit). The code and config here suggest that there should be a lot of messages processed in parallel, however the entire processing is totally serialised and I can't understand where have I failed in this setup. Any suggestion would be helpful. Thank you Cos @Test public void testAkka() throws Exception { Config cfg = ConfigFactory.load(test-akka.conf); ActorSystem actorSystem = ActorSystem.create(main, cfg.getConfig(main)); ActorRef actorRef = actorSystem.actorOf(Props.create(MyActor.class), myactor); for (int i = 0; i 150; i++) { actorRef.tell(nothing, ActorRef.noSender()); } Thread.sleep(100); } public static final class MyActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { System.out.println(Doing stuff); Thread.sleep(2000); } } And the test-akka.conf file main { app-dispatcher { type = Dispatcher executor = fork-join-executor fork-join-executor { parallelism-min = 16 parallelism-factor = 32.0 parallelism-max = 512 } throughput = 1 } akka.actor.deployment { /myactor { dispatcher = app-dispatcher router = round-robin-pool nr-of-instances = 16 } } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe http://typesafe.com/ – Reactive Apps on the JVM twitter: @bantonsson http://twitter.com/#!/bantonsson -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this
Re: [akka-user] [akka-stream] Prebuilt Source with simple synchronous API
Hi Alexey, On Sat, Jan 24, 2015 at 11:20 AM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Hey hakkers, I wonder why there is not such prebuilt Source that provides API to externally emit message by simple method call. I am talking about something like this: //building and starting flow val flow = ??? val source = ExternalSource[String] source.via(flow).to(BlackholeSink).run //using source source.emit() What would happen if it can't? source.complete() source.error(new Exception) It can be implemented manually using actor, buffer inside it and exposing some object that internally sends message to this actor. I think this source can be very useful in case of integration with external API. I agree, I think something like this could be quite useful, the problem is getting the API right. Thanks p.s. Sure, we can not directly use ExternalSource to emit objects, but should use some object created during flow materialization, like PropSource does. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?
Hi Allen, I'd suspect the reason that it works well with Akka Streams is that they have back-pressure while your actor solution does not (you'll send 40 million messages as fast as you can, but the actor processing them might not be able to keep up) On Fri, Jan 9, 2015 at 10:53 PM, Allen Nie aiming...@gmail.com wrote: Hey Viktor, I'm trying to use Akka to parallelize this process. There shouldn't be any bottleneck, and I don't understand why I got memory overflow with my first version (actor version). The main task is to read in a line, break it up, and turn each segments (strings) into an integer, then prints it out to a CSV file (vectorization process). def processLine(line: String): Unit = { val vector: ListBuffer[String] = ListBuffer() val segs = line.split(,) println(segs(0)) (1 to segs.length - 1).map {i = val factorArray = dictionaries(i-1) vector += factorArray._2.indexOf(segs(i)).toString //get the factor level of string } timer ! OneDone printer ! Print(vector.toList)} When I'm doing this in pure Akka (with actors), since I created 40 million objects: Row(line: String), I get memory overflow issue. If I use Akka-stream, there is no memory overflow issue, but the performance is too similar to the non-parallelized version (even slower). It's my first time using Akka-stream. So I'm unfamiliar with the optimization you were talking about. Sincerely, Allen On Friday, January 9, 2015 at 4:03:13 PM UTC-5, √ wrote: Hi Allen, What's the bottleneck? Have you tried enabling the experimental optimizations? On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie aimi...@gmail.com wrote: Thank you Soumya, I think Akka-streams is the way to go. However, I would also appreciate some performance boost as well - still have 40 million lines to go through! But thanks anyway! On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote: I would recommend using the Akka-streams API for this. Here is sample. I was able to process a 1G file with around 1.5 million records in *20MB* of memory. The file read and the writing on the console rates are different but the streams API handles that. This is not the fastest but you at least won't run out of memory. https://lh6.googleusercontent.com/-zdX0n1pvueE/VLATDja3K4I/v18/BH7V1RAuxT8/s1600/1gb_file_processing.png import java.io.FileInputStream import java.util.Scanner import akka.actor.ActorSystem import akka.stream.{FlowMaterializer, MaterializerSettings} import akka.stream.scaladsl.Source import scala.util.Try object StreamingFileReader extends App { val inputStream = new FileInputStream(/path/to/file) val sc = new Scanner(inputStream, UTF-8) implicit val system = ActorSystem(Sys) val settings = MaterializerSettings(system) implicit val materializer = FlowMaterializer(settings.copy(maxInputBufferSize = 256, initialInputBufferSize = 256)) val fileSource = Source(() = Iterator.continually(sc.nextLine())) import system.dispatcher fileSource.map { line = line //do nothing //in the for each print the line. }.foreach(println).onComplete { _ = Try { sc.close() inputStream.close() } system.shutdown() } } On Friday, January 9, 2015 at 10:53:33 AM UTC-5, Allen Nie wrote: Hi, I am trying to process a csv file with 40 million lines of data in there. It's a 5GB size file. I'm trying to use Akka to parallelize the task. However, it seems like I can't stop the quick memory growth. It expanded from 1GB to almost 15GB (the limit I set) under 5 minutes. This is the code in my main() method: val inputStream = new FileInputStream(E:\\Allen\\DataScience\\train\\train.csv)val sc = new Scanner(inputStream, UTF-8) var counter = 0 while (sc.hasNextLine) { rowActors(counter % 20) ! Row(sc.nextLine()) counter += 1} sc.close() inputStream.close() Someone pointed out that I was essentially creating 40 million Row objects, which naturally will take up a lot of space. My row actor is not doing much. Just simply transforming each line into an array of integers (if you are familiar with the concept of vectorizing, that's what I'm doing). Then the transformed array gets printed out. Done. I originally thought there was a memory leak but maybe I'm not managing memory right. Can I get any wise suggestions from the Akka experts here?? http://i.stack.imgur.com/yQ4xx.png -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at
Re: [akka-user] Sharing message queue between two akka actors?
Hi Krishna! On Sat, Jan 10, 2015 at 5:16 PM, Krishna Kadam shrikrishna.kad...@gmail.com wrote: HI all akka experts, I have following questions for you 1. Is it possible to share message queue between two akka actors? Yes and no, there's a BalancingRouter, but it's only for that one. 2. Is there any effect of increasing number of dispatchers on the message processing rate of akka actors? Having multiple dispatchers is about bulkheading different actor subtrees, not about processing rate (throughput). 3. What are the factors that affect the rate of message processing using akka Actors? Short answer: Little's Law Long answer: Depending where the bottleneck is, you may want to tune dispatcher settings (throughput iso fairness, backing executor service, number of threads and mailbox implementation). *Thanks Regards * *Shrikrishna Kadam* -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: UnboundedPriorityMailbox breaks message ordering?
Hi David, yes, I can definitely understand that it can be surprising, but I wouldn't call it a bug -per se-, since it is not a promise that was violated. If you happen to have, or come by, a performant version of a PriorityQueue with the semantics you described, please don't hesitate to share it. On Fri, Jan 9, 2015 at 7:21 PM, David Hotham david.hot...@googlemail.com wrote: It occurs to me that I wasn't completely clear: - Of course the priority mailbox must break message ordering in some general sense. Else it wouldn't be a priority mailbox at all! - But it is highly surprising to me that it should break ordering for messages of equal priority between a sender-receiver pair. On Friday, 9 January 2015 17:58:40 UTC, David Hotham wrote: Hi, We've been tracking down a bug in which reading from a TCP stream was getting all messed up. It turns out that we see the problem only when our actor handling Tcp.Received messages is using an UnboundedPriorityMailbox; the default mailbox doesn't exhibit any problem. I believe that the issue is that the UnboundedPriorityMailbox is backed by a PriorityBlockingQueue; and that, per the documentation for that class, Operations on this class make no guarantees about the ordering of elements with equal priority. That is, it seems that the UnboundedPriorityMailbox breaks the guarantee of message ordering per sender–receiver pair. In our case, the result being that different chunks of the TCP data stream arrive not in the order in which they were read from the wire. Does this analysis seem to be correct? If yes, is it a bug? Would you like me to raise a ticket? Thanks! David -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: UnboundedPriorityMailbox breaks message ordering?
On Fri, Jan 9, 2015 at 9:40 PM, David Hotham david.hot...@googlemail.com wrote: Of course it's normal and expected that a PriorityQueue returns equal priority elements in arbitrary order. That's just how heaps work. However that doesn't imply that a mailbox has to do the same thing! Absolutely, the reordering is not a requirement :) However, silently switching to a new, non-reordering-of-same-priority implementation would risk breaking existing code (that for some reason relies on this behavior). For instance, I guess that it shouldn't be very hard for the mailbox to maintain a sequence number that it associates with each message. Sure! The question is how how big the impact is on memory consumption and performance. Then it could order messages first according to the comparator that it currently uses, and second according to the sequence number. That way it would return equal priority messages in the same order that it saw them arrive - which gives exactly the property that I'd hoped for. Absolutely :) I can't tell from your reply whether you think that maintaining order for equal priority messages is desirable or not. I think it could be interesting as an alternative to the current version of the PriorityMailbox, or perhaps even as a replacement if it is performing as well as the thing it replaces. - If yes, what do you think about an implementation along these lines? Is it worth me raising an issue at github? I wish I had time to do that, is it something you'd be willing/able to take a stab at? Would love to see something like that. - If no, can I at least encourage the Akka maintainers to add an explicit note to the documentation, so that other users are less likely to fall into my error? I think this should be done anyway, since it as you said, and I confirmed, surprising at first. :) Cheers, David On Friday, 9 January 2015 19:43:27 UTC, √ wrote: Hi David, yes, I can definitely understand that it can be surprising, but I wouldn't call it a bug -per se-, since it is not a promise that was violated. If you happen to have, or come by, a performant version of a PriorityQueue with the semantics you described, please don't hesitate to share it. On Fri, Jan 9, 2015 at 7:21 PM, David Hotham david@googlemail.com wrote: It occurs to me that I wasn't completely clear: - Of course the priority mailbox must break message ordering in some general sense. Else it wouldn't be a priority mailbox at all! - But it is highly surprising to me that it should break ordering for messages of equal priority between a sender-receiver pair. On Friday, 9 January 2015 17:58:40 UTC, David Hotham wrote: Hi, We've been tracking down a bug in which reading from a TCP stream was getting all messed up. It turns out that we see the problem only when our actor handling Tcp.Received messages is using an UnboundedPriorityMailbox; the default mailbox doesn't exhibit any problem. I believe that the issue is that the UnboundedPriorityMailbox is backed by a PriorityBlockingQueue; and that, per the documentation for that class, Operations on this class make no guarantees about the ordering of elements with equal priority. That is, it seems that the UnboundedPriorityMailbox breaks the guarantee of message ordering per sender–receiver pair. In our case, the result being that different chunks of the TCP data stream arrive not in the order in which they were read from the wire. Does this analysis seem to be correct? If yes, is it a bug? Would you like me to raise a ticket? Thanks! David -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ:
Re: [akka-user] Re: How to Properly Manage Akka's Memory Usage with 40 Million Lines Task?
Hi Allen, What's the bottleneck? Have you tried enabling the experimental optimizations? On Fri, Jan 9, 2015 at 9:52 PM, Allen Nie aiming...@gmail.com wrote: Thank you Soumya, I think Akka-streams is the way to go. However, I would also appreciate some performance boost as well - still have 40 million lines to go through! But thanks anyway! On Friday, January 9, 2015 at 12:43:49 PM UTC-5, Soumya Simanta wrote: I would recommend using the Akka-streams API for this. Here is sample. I was able to process a 1G file with around 1.5 million records in *20MB* of memory. The file read and the writing on the console rates are different but the streams API handles that. This is not the fastest but you at least won't run out of memory. https://lh6.googleusercontent.com/-zdX0n1pvueE/VLATDja3K4I/v18/BH7V1RAuxT8/s1600/1gb_file_processing.png import java.io.FileInputStream import java.util.Scanner import akka.actor.ActorSystem import akka.stream.{FlowMaterializer, MaterializerSettings} import akka.stream.scaladsl.Source import scala.util.Try object StreamingFileReader extends App { val inputStream = new FileInputStream(/path/to/file) val sc = new Scanner(inputStream, UTF-8) implicit val system = ActorSystem(Sys) val settings = MaterializerSettings(system) implicit val materializer = FlowMaterializer(settings.copy(maxInputBufferSize = 256, initialInputBufferSize = 256)) val fileSource = Source(() = Iterator.continually(sc.nextLine())) import system.dispatcher fileSource.map { line = line //do nothing //in the for each print the line. }.foreach(println).onComplete { _ = Try { sc.close() inputStream.close() } system.shutdown() } } On Friday, January 9, 2015 at 10:53:33 AM UTC-5, Allen Nie wrote: Hi, I am trying to process a csv file with 40 million lines of data in there. It's a 5GB size file. I'm trying to use Akka to parallelize the task. However, it seems like I can't stop the quick memory growth. It expanded from 1GB to almost 15GB (the limit I set) under 5 minutes. This is the code in my main() method: val inputStream = new FileInputStream(E:\\Allen\\DataScience\\train\\train.csv)val sc = new Scanner(inputStream, UTF-8) var counter = 0 while (sc.hasNextLine) { rowActors(counter % 20) ! Row(sc.nextLine()) counter += 1} sc.close() inputStream.close() Someone pointed out that I was essentially creating 40 million Row objects, which naturally will take up a lot of space. My row actor is not doing much. Just simply transforming each line into an array of integers (if you are familiar with the concept of vectorizing, that's what I'm doing). Then the transformed array gets printed out. Done. I originally thought there was a memory leak but maybe I'm not managing memory right. Can I get any wise suggestions from the Akka experts here?? http://i.stack.imgur.com/yQ4xx.png -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] calling code that uses jsr166y
Hi Tim, is it using ForkJoinTask/RecursiveTask directly? On Wed, Jan 7, 2015 at 12:23 PM, Tim Pigden tim.pig...@optrak.com wrote: Hi I'm going to be calling some Java multi-threaded code that is currently using JSR166Y (due to backward compatibility requirements with some legacy java 6 code). My Java developer wants to know if he should do a branch to use java7 (or 8) compatible fork join pool instead The intention is I call this from within an actor and possibly from within multiple actors (i.e. simultaneously). So I could have 2 or 3 actors each calling this code (a fairly long lived dynamic program at up to 500ms) which itself is using fork join to parallel process with a fairly low number of threads. Is there a recommendation of best practice here? Tim -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Persistence on the Query Side: The Conclusion
On Fri, Jan 9, 2015 at 11:56 AM, Greg Young gregoryyou...@gmail.com wrote: Usually it comes down to the realization that the computer is not the book of record. One of my favourites was being asked to build a fully consistent inventory system. I generally like to approach things with questions, the one i had was 'sure but how do we get people who are stealing stuff to appropriately check it out?' +Long.MAX_VALUE on this. Building warehouse management systems is very eye-opening. On 9 Jan 2015 12:02, Sebastian Bach sebastian.tomasz.b...@gmail.com wrote: Thank you Greg. The mind shift from a preventive to a reactive workflow is not easy for users (humans), because it requires a change of habits. For many people computer systems are kind of authoritative. There is this wrong assumption (from early days of computation?) that a computer accepts only a valid input or returns an error. It was then only black or white, the golden era of transactions. But this was (as you pointed out) always to some degree an hypocrisy. Now we have this shades of gray and many users feel unsettled. This holds true for any kind of resource allocation application and the overbooking (or wrong booking) problem. Some of the users define taking workload off of them as avoiding of planning mistakes, like commit and forget. But the actual workflow seems to shift towards an iterative process of human-computer interaction, to some kind of react-react ping-pong. Best Regards Sebastian W dniu środa, 7 stycznia 2015 22:15:42 UTC+1 użytkownik Greg Young napisał: The consistency of the query model should be achieved as soon as possible and close to real-time. It really depends on the domain. I have worked in many situations where the data in question would be perfectly fine updated once per month. (e.g. adding a sold out item to the shopping cart). This is a funny example because it shows not that you need to update read models more quickly but that you need to get the whole business on board. Remember that computer systems are normally part of a larger system fulfilling business needs. It really is a mind shift moving to eventual consistency. In the example of adding a sold out item... why stop it? Does it matter that we don't have any of this item? The real question is how quickly we can get it and if its worth our while to do so. To be fair 30 years ago these times were much much higher than what we talk about today and yet businesses still managed to work their way through things. For many of these types allowing things to go incorrectly is actually a good thing (overbooked seats on an airline, overdraft charges at banks...). To really be benefiting from eventual consistency the whole business process must recognize it. In terms of handling failures they are normally handled in a reactive not a preventative manner (like most business problems). Detect the failure, let a human deal with it. At the end of the day the primary role of the computer system is to take workload off of humans. You will hit the law of diminishing returns. dont try to solve every problem :) Greg On Wed, Jan 7, 2015 at 11:07 PM, Sebastian Bach sebastian@gmail.com wrote: Hi Roland, one thing to keep in mind in the CQRS/ES architecture is that not only the query side depends on the command side (by following the event stream) but also the command side depends on the query side for validation of complex business rules. This has a deep impact on correctness and throughput. Validation checks on an potentially outdated query model in an eventually consistent architecture is a hard problem (e.g. adding a sold out item to the shopping cart). The consistency of the query model should be achieved as soon as possible and close to real-time. A PersistentView in Akka has a default of 5s? On the other hand the speed of validation depends on the speed of the queries. And the throughput depends on the validation speed. Thus, queries directly on the whole event stream are less useful than persistent projections. Keep up the good work :) Cheers Sebastian W dniu wtorek, 7 października 2014 07:32:20 UTC+2 użytkownik rkuhn napisał: Hi Vaughn, from our side nothing has happened yet: my conclusion is that this thread contains all the information we need when we start working on this. The reason why we are waiting is that this work will depend heavily upon Akka Streams and therefore we are finishing those first, which should take roughly one month. Meanwhile, if use cases come up which could be used to refine the plans, please point them out here so that we can take all the inputs into account. Regards, Roland 6 okt 2014 kl. 20:09 skrev Vaughn Vernon vve...@shiftmethod.com: Hi Roland, I's been a month this the last update on this and I have lost track of the status. Can you provide an update on where this stands?
Re: [akka-user] Re: UnboundedPriorityMailbox breaks message ordering?
On Wed, Jan 14, 2015 at 2:44 PM, Endre Varga endre.va...@typesafe.com wrote: On Fri, Jan 9, 2015 at 10:02 PM, Viktor Klang viktor.kl...@gmail.com wrote: On Fri, Jan 9, 2015 at 9:40 PM, David Hotham david.hot...@googlemail.com wrote: Of course it's normal and expected that a PriorityQueue returns equal priority elements in arbitrary order. That's just how heaps work. However that doesn't imply that a mailbox has to do the same thing! Absolutely, the reordering is not a requirement :) However, silently switching to a new, non-reordering-of-same-priority implementation would risk breaking existing code (that for some reason relies on this behavior). I disagree, previously the order between same priority entries was undefined, and undefined includes that the order is unchanged (the new behavior). Just my 2c. The proposed solution has worse performance than the current though. That would be a regression. -Endre For instance, I guess that it shouldn't be very hard for the mailbox to maintain a sequence number that it associates with each message. Sure! The question is how how big the impact is on memory consumption and performance. Then it could order messages first according to the comparator that it currently uses, and second according to the sequence number. That way it would return equal priority messages in the same order that it saw them arrive - which gives exactly the property that I'd hoped for. Absolutely :) I can't tell from your reply whether you think that maintaining order for equal priority messages is desirable or not. I think it could be interesting as an alternative to the current version of the PriorityMailbox, or perhaps even as a replacement if it is performing as well as the thing it replaces. - If yes, what do you think about an implementation along these lines? Is it worth me raising an issue at github? I wish I had time to do that, is it something you'd be willing/able to take a stab at? Would love to see something like that. - If no, can I at least encourage the Akka maintainers to add an explicit note to the documentation, so that other users are less likely to fall into my error? I think this should be done anyway, since it as you said, and I confirmed, surprising at first. :) Cheers, David On Friday, 9 January 2015 19:43:27 UTC, √ wrote: Hi David, yes, I can definitely understand that it can be surprising, but I wouldn't call it a bug -per se-, since it is not a promise that was violated. If you happen to have, or come by, a performant version of a PriorityQueue with the semantics you described, please don't hesitate to share it. On Fri, Jan 9, 2015 at 7:21 PM, David Hotham david@googlemail.com wrote: It occurs to me that I wasn't completely clear: - Of course the priority mailbox must break message ordering in some general sense. Else it wouldn't be a priority mailbox at all! - But it is highly surprising to me that it should break ordering for messages of equal priority between a sender-receiver pair. On Friday, 9 January 2015 17:58:40 UTC, David Hotham wrote: Hi, We've been tracking down a bug in which reading from a TCP stream was getting all messed up. It turns out that we see the problem only when our actor handling Tcp.Received messages is using an UnboundedPriorityMailbox; the default mailbox doesn't exhibit any problem. I believe that the issue is that the UnboundedPriorityMailbox is backed by a PriorityBlockingQueue; and that, per the documentation for that class, Operations on this class make no guarantees about the ordering of elements with equal priority. That is, it seems that the UnboundedPriorityMailbox breaks the guarantee of message ordering per sender–receiver pair. In our case, the result being that different chunks of the TCP data stream arrive not in the order in which they were read from the wire. Does this analysis seem to be correct? If yes, is it a bug? Would you like me to raise a ticket? Thanks! David -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List
Re: [akka-user] Handling of Fatal errors
Hi Andrey, Thanks for your email, I've attempted to elucidate the current semantics but there may definitely be room for improvements. On 10 Feb 2015 17:23, Andrey andrewkor...@hotmail.com wrote: Hello, I have Akka configured not to exit JVM on fatal errors by setting the akka.jvm-exit-on-fatal-error property to off. In such case, it's expected to shutdown the actor system. As a side note, I find Scala's opinion as to which java.lang.Error are to be treated as fatal and non-fatal to be mildly peculiar. One example would be the OSGi environment where NoClassDefFoundError is not a fatal exception, but on the contrary, is very much recoverable without a JVM restart. Scala disagrees. it is only the catchall (NonFatal) that considers it fatal, if you expect it [NCDFE] in your code, you can/should surround it with a try catch and promote it to a non fatal exception or otherwise deal with the normal case. Generic NCDFEs are definitely not recoverable as they typically signal that the classpath/application is broken. Anyway, back to the problem, Akka seems to recognize the setting correctly and doesn't halt the JVM process. However, when later I call a shutdown() followed by an awaitTermination() on the instance of the actor system whose actor has just fatally failed, the awaitTermination() call never returns (blocks forever). Also, neither the actor that has thrown the Error, nor its parents (up the supervision tree) get called postStop(). It looks almost like if Akka forgets to stop the failed actor and its immediate parent is left waiting forever unable to proceed with its own shutdown. In the face of a fatal error no guarantees can be made as to the consistency of the JVM nor the data within it, as such we recommend to forcibly shut down the JVM. Please note that the siblings of the failed actor (as well as all other unrelated actors) do get their postStop() invoked. Since `postStop` is invoked in the -instance- that produced a fatal failure I guess there's the philosophical argument to be had whether it can attempt to stop itself or not. Although I could not find anything relevant in Akka documentation, my expectation is that postStop() of all actors is always invoked during actor system shutdown. That is not a guarantee that can be made nor kept on the JVM as thread interruption is cooperative and thread stop is undefined. Below is a simple unit test that demonstrates the problem. Thanks you for your help. Andrey package example; import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import com.google.common.util.concurrent.SettableFuture; import com.typesafe.config.ConfigFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; import scala.concurrent.duration.Duration; public class AkkaUtilTest { ActorSystem akka; @Before public void setUp() throws Exception { akka = ActorSystem.create(test, ConfigFactory.parseString(akka.jvm-exit-on-fatal-error=off)); } @After public void tearDown() throws Exception { akka.shutdown(); // This call never returns. akka.awaitTermination(Duration.create(5000, TimeUnit.MILLISECONDS)); } @Test(timeout = 2000) public void testError() throws Exception { SettableFuture? result = SettableFuture.create(); ActorRef actorRef = akka.actorOf(Props.create(ThrowingActor.class, result)); actorRef.tell(new Object(), ActorRef.noSender()); // The get() call never returns and the test times out. result.get(); } static class ThrowingActor extends UntypedActor { final SettableFuture? stopped; ThrowingActor(SettableFuture? stopped) { this.stopped = stopped; } @Override public void onReceive(Object message) throws Exception { // These cause postStop() never called on this actor and the actor system can't be // gracefully shutdown. //throw new OutOfMemoryError(oh, my!); throw new NoClassDefFoundError(no-class-def); // These work just fine: // throw new AssertionError(assertion); // throw new Error(error); } @Override public void postStop() throws Exception { // Never called. stopped.set(null); } } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this
Re: [akka-user] calling code that uses jsr166y
Hmmm, I've reread your question a couple of times and I am not sure what you are asking, is there something more specific your wonder? (I don't know what goals you have) On Wed, Jan 7, 2015 at 12:40 PM, Tim Pigden tim.pig...@optrak.com wrote: hi - yes On 7 January 2015 at 11:32, Viktor Klang viktor.kl...@gmail.com wrote: Hi Tim, is it using ForkJoinTask/RecursiveTask directly? On Wed, Jan 7, 2015 at 12:23 PM, Tim Pigden tim.pig...@optrak.com wrote: Hi I'm going to be calling some Java multi-threaded code that is currently using JSR166Y (due to backward compatibility requirements with some legacy java 6 code). My Java developer wants to know if he should do a branch to use java7 (or 8) compatible fork join pool instead The intention is I call this from within an actor and possibly from within multiple actors (i.e. simultaneously). So I could have 2 or 3 actors each calling this code (a fairly long lived dynamic program at up to 500ms) which itself is using fork join to parallel process with a fairly low number of threads. Is there a recommendation of best practice here? Tim -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to a topic in the Google Groups Akka User List group. To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/D4eu8JUq2Lk/unsubscribe. To unsubscribe from this group and all its topics, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Tim Pigden Optrak Distribution Software Limited +44 (0)1992 517100 http://www.linkedin.com/in/timpigden http://optrak.com Optrak Distribution Software Ltd is a limited company registered in England and Wales. Company Registration No. 2327613 Registered Offices: Suite 6,The Maltings, Hoe Lane, Ware, SG12 9LR England This email and any attachments to it may be confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily represent those of Optrak Distribution Software Ltd. If you are not the intended recipient of this email, you must neither take any action based upon its contents, nor copy or show it to anyone. Please contact the sender if you believe you have received this email in error. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] calling code that uses jsr166y
Hi Tim, Akka fork-join-executor-based dispatchers uses Scala's embedded jsr166 version of FJ. I don't think it lets you work with FJT though. On Wed, Jan 7, 2015 at 1:16 PM, Tim Pigden tim.pig...@optrak.com wrote: Hi Viktor, Ok sorry - I'm not an expert in these matters - apologies for the vagueness. I'll do some homework and make him read the docs. Dispatcher looks a good place to start. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Surviving VirtualMachineError
Hi Alexandre, unfortunately it does not work like that. Recovering from SOE and OOME is not possible in general, and as such it is better to fail fast and fix the bug instead of limping along in a possibly corrupted state. On Wed, Mar 25, 2015 at 3:13 PM, Alexandre Russel alexan...@russel.fr wrote: Hi all, we're using an actor system in our play application with the default strategies. One of our actor misbehaved and throw a StackOverflowError which shutdown the actor system. The JVM was still running but the actor system was down. I was wondering if it would make sense to catch StackOverflow and OutOfMemory error in our default strategy. We are not keeping state in our actors and once the actor is restarted the stack or memory should be back to working state. What do you think ? Alex -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Stream 1.0M5 : buffersize = 0
Hi Peter, Akka streams is about async stream processing and without a buffer you cannot be async. -- Cheers, √ On 30 Mar 2015 09:22, Peter Schmitz petrischm...@gmail.com wrote: Actually ActorFlowMaterializerSettings is complaining about initialInputBufferSize = 0. What are the reasons you can't disable buffering elements by setting initialInputBufferSize and maxInputBufferSize to zero? I am aware that one of the core features of akka-stream is buffering to gain throughput. But in my use case I like to disable buffering at least for a certain stream-section. Are there other ways to accomplish this? Thanks in advance! Great job, guys! Peter -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Stream 1.0M5 : buffersize = 0
On Mon, Mar 30, 2015 at 10:36 AM, Peter Schmitz petrischm...@gmail.com wrote: I know that, but a section of my Flowgraph (SubFlowGraph) should act like a single stream stage, should act like a single stream stage—from what/whose perspective? that means having only one (possibly large) buffer for that section. The whole section will be used async. Currently each stage in my SubFlowGraph buffers at least one element. Sadly it's not possible to make that section a function f to use Flow[T].map(f). Any ideas? May I ask what the problem with the current behavior is for you? Am Montag, 30. März 2015 10:08:10 UTC+2 schrieb √: Hi Peter, Akka streams is about async stream processing and without a buffer you cannot be async. -- Cheers, √ On 30 Mar 2015 09:22, Peter Schmitz petris...@gmail.com wrote: Actually ActorFlowMaterializerSettings is complaining about initialInputBufferSize = 0. What are the reasons you can't disable buffering elements by setting initialInputBufferSize and maxInputBufferSize to zero? I am aware that one of the core features of akka-stream is buffering to gain throughput. But in my use case I like to disable buffering at least for a certain stream-section. Are there other ways to accomplish this? Thanks in advance! Great job, guys! Peter -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Stream 1.0M5 : buffersize = 0
Assuming that the flow is local you can always skip entries that are old (I'd advise against using wallclock; and nanoTime is local only) on a per stage basis. On Mon, Mar 30, 2015 at 2:57 PM, Endre Varga endre.va...@typesafe.com wrote: Hi Peter, Why don't you just create a custom stage then where you bundle up all the calculations you need to be done synchronously? (Btw, I agree with Roland that you probably over-worry about those buffers) -Endre On Mon, Mar 30, 2015 at 2:54 PM, Peter Schmitz petrischm...@gmail.com wrote: Hi Endre, I am aware of this, and I know I want to disable a core feature of akka stream for a certain section of my flow. That said my actor buffering in front of processing is fine but the in between these two actors processing by my partial flowgraph should be done without buffering due to the staleness issue I mentioned. Peter Am Montag, 30. März 2015 14:39:53 UTC+2 schrieb drewhk: Hi Peter, The issue is, that these processing elements are actors, which have a mailbox, so there is buffering in any case. You cannot have meaningful asynchronous processing without at least a buffer size of one. -Endre On Mon, Mar 30, 2015 at 2:36 PM, Peter Schmitz petris...@gmail.com wrote: Hi Roland, I've tried different OverflowStrategies in front of my partial flow, but none worked... as expected because this doesn't change my problem that there are already stale elements in the processing chain. I guess the chain is about 10 stages long (but user defined and not known in advance) and even with buffer = 1 for each stage there are 10 stale elements since the time buffers have been filled up ( 10 fast inputs). So there is a non acceptable lag on my screen regarding my example. Recently I tried to solve this by using a FlowGraph with one in-actor and one out-actor (PropsSource/PropsSink) passed. All in-elements are piped through my partial flow graph and passed to the out-actor. So outside of the main flowgraph after materialization I connected the out-actor to the in-actor so if the out-actor gets an element the in-actor is notified to send the next element into the processing chain. In addition the in-actor buffers all external incoming events and manages to discard old ones. My remaining problem is, that it is a user defined processing and I can't assume that one input element will generate exactly one output element. So I can determine when processing was done. Am Montag, 30. März 2015 14:06:59 UTC+2 schrieb rkuhn: Hi Peter, 30 mar 2015 kl. 13:30 skrev Peter Schmitz petris...@gmail.com: Hi Roland, thank you for your detailed answer. In my timestamp example timestamps are processed while new timestamps arrive and at this point processing of old timestamps is obsolete but they are somewhere in the processing chain. Of course I can discard them at the end when dealing with timestamps (comparing with the current time) and perhaps (have to think about that) even when not dealing with timestamps but rather with some other inputstream with new incoming elements invalidating the processing of not fully processed elements before they are stucked in the buffers of some stage(s) of my partial flowgraph. A synchronous processing of my partial(!) flowgraph avoids having elements that are obsolete AND not fully processed. I this manner only required elements are processed and CPU load is lower and more important a more realtime outcome. For example, say you have a stream of stock prices and you do some processing in a partial flowgraph and finally you show the result as some remolded value on a screen. If stockprices are faster generated than your processing yielding a screen value can handle you won't see real time prices anymore and if the prices stream pauses emitting at stock exchange closing your screen is still refreshing until all involved intermediate buffers are drained. That won't happen with a certain buffer strategy (DropHead) for the stock prices stream and synchronous processing for the following partial flowgraph. So my program is not broken but it is not realtime and heats up my CPU more than necessary. Delay is inevitable because processing takes time but unnecessary delay of having obsolete elements in internal buffers is evitable. It seems that you are worrying a little too much: in practice all these buffers of size one will ever do is to allow pipelining of the processing of stages, which means that the latency of pushing something through the pipeline should not change much between this minimal buffer and no buffering at all, while the throughput will increase due to the ability of processing multiple steps in parallel. For the use-case you describe I’d recommend just placing a buffer with an adequate dropping strategy in front of your partial flow (I’d try out OverflowStrategy.dropBuffer first) and see if that is not good enough. Fusing stages together may benefit latency
Re: [akka-user] Re: Memory leak somehow related to ask pattern
Wow, what an amazing catch! On Tue, Mar 31, 2015 at 1:01 AM, Konrad Malawski konrad.malaw...@typesafe.com wrote: Hi guys, quick update on the topic. It has indeed uncovered a bug in the PromiseActorRef. It does handle sending Termination properly, but did so with a mistaken actor in the Terminated(…) message, thus you wouldn’t get the Terminated you were waiting for, and the above explanation of Yaroslav completes the rest of the story. The fix is in this PR since today morning: https://github.com/akka/akka/pull/17102 https://github.com/akka/akka/pull/17102: Thanks for notifying us that something is fishy around there! -- Cheers, Konrad 'ktoso’ Malawski Akka http://akka.io @ Typesafe http://typesafe.com On 29 March 2015 at 09:33:49, Yaroslav Klymko (t3h...@gmail.com) wrote: So PrmiseActorRef is leaking and keeps reference on Promise WriteEventsCompleted, still I'm not able to find a combination of dispatchers/actors to reproduce on simple example. On Saturday, March 28, 2015 at 9:33:15 PM UTC+2, Yaroslav Klymko wrote: Hi guys, I have a very strange problem, it was originally spotted by @gfduszynski https://github.com/EventStore/EventStore.Akka.Persistence/issues/11 Here is a problematic line of code https://github.com/EventStore/EventStore.Akka.Persistence/ blob/v2.0.0/src/main/scala/akka/persistence/eventstore/ journal/EventStoreJournal.scala#L28 Basically it is a *actor ? WriteEvents* which returns *Future[WriteEventsCompleted]*, these *WriteEventsCompleted* messages are leaking, however *actor ! WriteEvents* is not leaking... Also I couldn't reproduce this using simple app without akka persistence. I will really appreciate any help, any ideas. Thanks! https://lh3.googleusercontent.com/-GswiQ2lLtTU/VRcAmBy7mDI/MHE/0fjbSplbpK4/s1600/Screenshot%2Bat%2BMar%2B28%2B21-26-50.png https://lh3.googleusercontent.com/-iBvp3mltxVs/VRcArTBLSpI/MHM/1E9raW_PKnI/s1600/Screenshot%2Bat%2BMar%2B28%2B21-26-35.png -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: how to tune akka remoting performance
Sounds like you're using too many FJ threads. On Mon, Mar 30, 2015 at 6:36 PM, Ivan Balashov ibalas...@gmail.com wrote: While running the test I see that interrupts and context switches are rather high (for a 4 cpu core box) r b swpd free buff cache si sobibo in cs us sy id wa 5 0 0 2083556 11296 39838000 0 0 15491 39242 58 21 21 0 1 0 0 2110728 11304 39838000 0 7 17791 42876 50 21 29 0 0 0 0 2114272 11304 39838000 0 0 22580 47400 34 15 51 0 3 0 0 2132924 11304 39838000 0 4 26861 53711 26 10 65 0 0 0 0 2119648 11304 39838000 0 2 27225 54133 26 10 64 0 Trying to adjust akka.remote.default-remote-dispatcher.throughput did not have any noticeable effect. Could anyone suggest how to reduce context switches with akka remoting in case of many small messages? Thanks, On Monday, March 30, 2015 at 1:38:40 PM UTC+3, Ivan Balashov wrote: Hi, Running simple Ping-Pong test (http://goo.gl/6gyWsd) between two JVMs on same box (4 Cores, tested both on my laptop and GCE) gives me total throughput 20K msg/sec, which are a CPU bound processes on both sides. Any ideas how you could manage 60K with almost idle CPU? Thanks, On Saturday, August 9, 2014 at 5:08:18 PM UTC+3, Sean Zhong wrote: For Akka 2.3.4, 2 machines, GbE network, one machine contains a sender actor, the other contains a reciver actor. Sender actor will continuously send message (100 bytes each, pre-created in memory) to receiver actor with simple flow control based on sliding window and acking(one ack per 100 messages). I can only get throughput about 60K messages/second , which is much slower than my expectation. The CPU usage is almost idle(3%). Network usage is about 20% of the bandwidth. I use kryo serialization, and singleConsumerUnboundedMailBox, 4 local dispatcher, 4 remote dispatcher, default setting for netty. Is there performance baseline number for akka remoting in your smoke test? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: how to tune akka remoting performance
Could you share your entire config? On Tue, Mar 31, 2015 at 5:31 PM, Ivan Balashov ibalas...@gmail.com wrote: On Tuesday, March 31, 2015 at 11:23:25 AM UTC+3, √ wrote: Sounds like you're using too many FJ threads. I wish it was that simple. For both remoting and actor pool the same dispatcher is used (4 core box): type = Dispatcher executor = fork-join-executor throughput = 1000 // Does this apply to FJ, or only to ThreadPoolEx? fork-join-executor { parallelism-min = 1 parallelism-max = 4 } I get lower CS values if I set parallelism-max=1, maintaining about the same total throughput only with less cpu burn. However, it looks like CS should not depend much on whether we have 1 or 4 or 10 FJ threads, most switching must be happing on deeper level, e.g. controlled by `throughput`. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: how to tune akka remoting performance
Looks like you're using Java Serialization, a good first start would be to switch serializers for your messages. On Wed, Apr 1, 2015 at 3:49 PM, Ivan Balashov ibalas...@gmail.com wrote: Viktor, Here is the latest configuration that allowed to me reach 30msg/sec between 2 x 4cpu hosts in GCE network. http://goo.gl/sPnjQS On both sides context switches jumped to 30-50K/sec Client: 0 0 0 1528652 94396 79422800 0 4 468 772 0 0 100 0 2 0 0 1533988 94408 79422400 0 5 9051 16555 11 4 85 0 1 0 0 1326560 94412 79422400 0 6 28710 52792 34 13 54 0 2 0 0 1326188 94412 79422800 0 3 30209 59439 26 8 66 0 1 0 0 1325444 94420 79422800 0 4 26194 51556 24 6 70 0 1 0 0 1328172 94424 79422800 0 1 468 725 0 0 100 0 Server: 3 0 0 827812 30372 206631600 0 3 9762 13188 48 7 45 0 1 0 0 564080 30388 206631600 0 3 19969 23901 29 13 59 0 4 0 0 416900 30388 206631600 0 0 24864 31988 39 13 48 0 1 0 0 241876 30388 206631600 0 3 24113 34808 42 11 47 0 4 0 0 145280 30404 201443200 0 2 21127 40357 44 6 50 0 2 0 0 106608 30404 196002400 0 0 19162 36545 47 6 47 0 4 0 0 138560 30328 186709200 0 6 14574 27803 57 4 39 0 2 0 0 293616 30336 186709200 0 4 6390 10023 42 12 46 0 Couple more things that concerned me: 1) [WARN] [04/01/2015 12:54:26.125] [systemB-network-dispatcher-5] [akka.tcp:// systemB@10.110.112.155:2001/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsystemA%4010.110.47.70%3A2001-0/endpointWriter] [ *473494*] buffered messages in EndpointWriter for [akka.tcp:// systemA@10.110.47.70:2001]. *You should probably implement flow control to avoid flooding the remote connection.* Looks like either side is sending more messages than receiving is capable to accept. This however, might be cause by 1M network buffer, which, OTOH is needed for higher throughput. jFTR, GCE network is quite fast (my last measurement gives me 72Mb/sec, while during the test we barely hit 10Mb/sec). 2) Significant heap pressure, apparently caused by 1) and, as a consequence, some GC activity. Profiler gives me estimate of ~700bytes per every message in the queue, quickly growing heap. The slower messages get processed, the more of them are accumulated, the more cpu is needed for GC, chicken-egg. Any configuration advice to achieve better throughput in this scenario of many small messages? On Wednesday, April 1, 2015 at 10:59:32 AM UTC+3, √ wrote: Could you share your entire config? On Tue, Mar 31, 2015 at 5:31 PM, Ivan Balashov ibal...@gmail.com wrote: On Tuesday, March 31, 2015 at 11:23:25 AM UTC+3, √ wrote: Sounds like you're using too many FJ threads. I wish it was that simple. For both remoting and actor pool the same dispatcher is used (4 core box): type = Dispatcher executor = fork-join-executor throughput = 1000 // Does this apply to FJ, or only to ThreadPoolEx? fork-join-executor { parallelism-min = 1 parallelism-max = 4 } I get lower CS values if I set parallelism-max=1, maintaining about the same total throughput only with less cpu burn. However, it looks like CS should not depend much on whether we have 1 or 4 or 10 FJ threads, most switching must be happing on deeper level, e.g. controlled by `throughput`. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ:
Re: [akka-user] Many concurrent network connections
Hi Haddock, First, let me say that your name is awesome. Secondly, have a look here: https://typesafe.com/blog/qa-with-caoyuan-deng-akka-at-wandoujia -- Cheers, √ On 25 Feb 2015 20:35, Haddock ffm2...@web.de wrote: Hello, I have a questions concerning holding many network connections and whether Akka can handle this. Let's say a system needs to be able to hold 50.000 network connections at the same time. So there are 50.000 incoming connections and 50.000 outgoing connections. In a conventional system this would mean that 100.000 threads need to be spawned. Obviously, that many threads cannot be created on the JVM (I know Erlang can do this). My question is now whether Akka can do this or whether I need to move some load balancer in place like Vert.x or Erlang or something. Thanks, Haddock -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Delaying futures?
Hi John, try this signature instead: http://doc.akka.io/japi/akka/2.3.9/akka/pattern/Patterns.html#after(scala.concurrent.duration.FiniteDuration, akka.actor.Scheduler, scala.concurrent.ExecutionContext, java.util.concurrent.Callable) On Thu, Jan 29, 2015 at 10:34 PM, John Ulric uja...@gmail.com wrote: Is there a way to delay the execution (not the completion) of a future, similar to the scheduleOnce method for actors? I tried this: FutureString delayedString = Patterns.after(new FiniteDuration(10, TimeUnit.SECONDS), system.scheduler(), system.dispatcher(), Futures.future(new CallableString() { public String call() throws Exception { System.out.println(Returning some string immediately); return Some string.; } }, system.dispatcher())); System.out.println(delayedString= + Await.result(delayedString, Duration.Inf())); But this calls the Callable immediately and only waits 10 seconds for the result to be returned. Thanks, John -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka io udp broadcast to subnet
Hi, 255.255.255.255 has been deprecated/discouraged for many many years. Related: http://stackoverflow.com/questions/7481332/udp-broadcast-on-java-doesnt-work On Thu, Apr 2, 2015 at 10:27 PM, saumitra.srivast...@gmail.com wrote: I am trying to use akka-io to broadcast a UDP packet to all the nodes in my subnet? I am following up the scala example from http://doc.akka.io/docs/akka/snapshot/scala/io-udp.html Its not working if I am using 255.255.255.255. Although its working fine when I give direct IP - 10.172.137.125 I tried the same task using java.net.DatagramSocket and it is working perfectly, i.e. i can receive the broadcast message on all subnet nodes, when sending using 255.255.255.255. So that means that there are no networking issues between machines. But if possible I want to do it using akka-io itself? I am new to network programming. Any ideas what I might be doing wrong? Let me know what more info can I provide to identify the issue. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Akka Stream is not running (without compiler error)
I suspect your problem is that your test exits before the stream has a chance to run. On Thu, Apr 2, 2015 at 11:56 PM, Viktor Klang viktor.kl...@gmail.com wrote: Works on my machine: scala import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer scala import akka.stream._ import akka.stream._ scala import akka.stream.scaladsl._ import akka.stream.scaladsl._ scala implicit val sys = ActorSystem(repl) sys: akka.actor.ActorSystem = akka://repl scala import sys.dispatcher import sys.dispatcher scala implicit val fm = ActorFlowMaterializer() fm: akka.stream.ActorFlowMaterializer = ActorFlowMaterializerImpl(ActorFlowMaterializerSettings(4,16,,function1,StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,Optimizations(false,false,false,false)),akka.dispatch.Dispatchers@3dff9cf6 ,Actor[akka://repl/user/$b#575469743],1,flow,Optimizations(false,false,false,false)) scala val source: Source[Int, Unit] = Source(1 to 10) source: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@4d8aca2f scala val printSink = Sink.foreach[Int](println) printSink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[Unit]] = akka.stream.scaladsl.Sink@7d6cac88 scala val actionStream: ArrayBuffer[(Int) = Int] = ArrayBuffer.empty[(Int) = Int] actionStream: scala.collection.mutable.ArrayBuffer[Int = Int] = ArrayBuffer() scala def add(a: Int) = a + 1 add: (a: Int)Int scala scala def exec(): Unit = | actionStream.drop(1).foldLeft(source)( | (source, action) = source.via(Flow[Int].mapAsync(e = Future(action(e | ).runWith(printSink) exec: ()Unit scala scala actionStream += add res6: actionStream.type = ArrayBuffer(function1) scala actionStream += add res7: actionStream.type = ArrayBuffer(function1, function1) scala exec() scala 2 3 4 5 6 7 8 9 10 11 On Thu, Apr 2, 2015 at 11:29 PM, Allen Nie aiming...@gmail.com wrote: I came up a much simplified program to test, and I still get nothing: val source: Source[Int, Unit] = Source(1 to 10) val printSink = Sink.foreach[Int](e = println(e)) val actionStream: ArrayBuffer[(Int) = Int] = ArrayBuffer.empty[(Int) = Int] def add(a: Int) = a + 1 def exec(): Unit = { val sourceReady = actionStream.drop(1).foldLeft(source) {(source, action) = source.via(Flow[Int].mapAsync(e = Future(action.apply(e } sourceReady sourceReady.runWith(printSink) } parallel test should work in { actionStream += ((a: Int) = add(a)) actionStream += ((a: Int) = add(a)) exec() } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Akka Stream is not running (without compiler error)
Works on my machine: scala import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer scala import akka.stream._ import akka.stream._ scala import akka.stream.scaladsl._ import akka.stream.scaladsl._ scala implicit val sys = ActorSystem(repl) sys: akka.actor.ActorSystem = akka://repl scala import sys.dispatcher import sys.dispatcher scala implicit val fm = ActorFlowMaterializer() fm: akka.stream.ActorFlowMaterializer = ActorFlowMaterializerImpl(ActorFlowMaterializerSettings(4,16,,function1,StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,Optimizations(false,false,false,false)),akka.dispatch.Dispatchers@3dff9cf6 ,Actor[akka://repl/user/$b#575469743],1,flow,Optimizations(false,false,false,false)) scala val source: Source[Int, Unit] = Source(1 to 10) source: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@4d8aca2f scala val printSink = Sink.foreach[Int](println) printSink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[Unit]] = akka.stream.scaladsl.Sink@7d6cac88 scala val actionStream: ArrayBuffer[(Int) = Int] = ArrayBuffer.empty[(Int) = Int] actionStream: scala.collection.mutable.ArrayBuffer[Int = Int] = ArrayBuffer() scala def add(a: Int) = a + 1 add: (a: Int)Int scala scala def exec(): Unit = | actionStream.drop(1).foldLeft(source)( | (source, action) = source.via(Flow[Int].mapAsync(e = Future(action(e | ).runWith(printSink) exec: ()Unit scala scala actionStream += add res6: actionStream.type = ArrayBuffer(function1) scala actionStream += add res7: actionStream.type = ArrayBuffer(function1, function1) scala exec() scala 2 3 4 5 6 7 8 9 10 11 On Thu, Apr 2, 2015 at 11:29 PM, Allen Nie aiming...@gmail.com wrote: I came up a much simplified program to test, and I still get nothing: val source: Source[Int, Unit] = Source(1 to 10) val printSink = Sink.foreach[Int](e = println(e)) val actionStream: ArrayBuffer[(Int) = Int] = ArrayBuffer.empty[(Int) = Int] def add(a: Int) = a + 1 def exec(): Unit = { val sourceReady = actionStream.drop(1).foldLeft(source) {(source, action) = source.via(Flow[Int].mapAsync(e = Future(action.apply(e } sourceReady sourceReady.runWith(printSink) } parallel test should work in { actionStream += ((a: Int) = add(a)) actionStream += ((a: Int) = add(a)) exec() } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Akka Stream is not running (without compiler error)
You'll need to either use a test framework which can deal with async tests or you'll have to wait for the result and perform the assertion at the end of each test. -- Cheers, √ On 3 Apr 2015 00:21, Allen Nie aiming...@gmail.com wrote: Thank you Viktor!! I tried to switch from test to App, and it works. Also it works with Map instead of MapAsync. Does this mean I can not test this kind of things anymore? Is there anything I can do to still make the test case work?? Sincerely, Allen On Thursday, April 2, 2015 at 6:04:24 PM UTC-4, √ wrote: I suspect your problem is that your test exits before the stream has a chance to run. On Thu, Apr 2, 2015 at 11:56 PM, Viktor Klang viktor...@gmail.com wrote: Works on my machine: scala import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer scala import akka.stream._ import akka.stream._ scala import akka.stream.scaladsl._ import akka.stream.scaladsl._ scala implicit val sys = ActorSystem(repl) sys: akka.actor.ActorSystem = akka://repl scala import sys.dispatcher import sys.dispatcher scala implicit val fm = ActorFlowMaterializer() fm: akka.stream.ActorFlowMaterializer = ActorFlowMaterializerImpl( ActorFlowMaterializerSettings(4,16,,function1, StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,Optimizations(false,false, false,false)),akka.dispatch.Dispatchers@3dff9cf6,Actor[ akka://repl/user/$b#575469743],1,flow,Optimizations(false, false,false,false)) scala val source: Source[Int, Unit] = Source(1 to 10) source: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@4d8aca2f scala val printSink = Sink.foreach[Int](println) printSink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[Unit]] = akka.stream.scaladsl.Sink@7d6cac88 scala val actionStream: ArrayBuffer[(Int) = Int] = ArrayBuffer.empty[(Int) = Int] actionStream: scala.collection.mutable.ArrayBuffer[Int = Int] = ArrayBuffer() scala def add(a: Int) = a + 1 add: (a: Int)Int scala scala def exec(): Unit = | actionStream.drop(1).foldLeft(source)( | (source, action) = source.via(Flow[Int].mapAsync(e = Future(action(e | ).runWith(printSink) exec: ()Unit scala scala actionStream += add res6: actionStream.type = ArrayBuffer(function1) scala actionStream += add res7: actionStream.type = ArrayBuffer(function1, function1) scala exec() scala 2 3 4 5 6 7 8 9 10 11 On Thu, Apr 2, 2015 at 11:29 PM, Allen Nie aimi...@gmail.com wrote: I came up a much simplified program to test, and I still get nothing: val source: Source[Int, Unit] = Source(1 to 10) val printSink = Sink.foreach[Int](e = println(e)) val actionStream: ArrayBuffer[(Int) = Int] = ArrayBuffer.empty[(Int) = Int] def add(a: Int) = a + 1 def exec(): Unit = { val sourceReady = actionStream.drop(1).foldLeft(source) {(source, action) = source.via(Flow[Int].mapAsync(e = Future(action.apply(e } sourceReady sourceReady.runWith(printSink) } parallel test should work in { actionStream += ((a: Int) = add(a)) actionStream += ((a: Int) = add(a)) exec() } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit
Re: [akka-user] 2.4-SNAPSHOT Did bind-hostname/bind-port functionality change?
Hi Greg, have you verified that the configuration is applied to the application? On Tue, Apr 14, 2015 at 10:13 AM, tigerfoot gzol...@gmail.com wrote: Hello, I had a working demo of Akka remoting working in a Docker container. I ran my server in Docker and was able to communicate with it from an external program. My application.conf looked like this: akka { loglevel = ERROR stdout-loglevel = ERROR loggers = [akka.event.slf4j.Slf4jLogger] actor { provider = akka.remote.RemoteActorRefProvider } remote { enabled-transports = [akka.remote.netty.tcp] netty.tcp { # Internal addr bind-hostname = localhost # also tried 127.0.0.1 bind-port = 2551 # External Docker addr hostname = 172.16.240.141 port = 9100 } } } Boxed up I run my container like this (to map ports)--the web port mapping works fine: docker run -it -p 9100:2551 -p 9101:8080 --name dexp localhost:5000/root My client tries to connect to it like this: val c = ConfigFactory parseString akka { actor { provider = akka.remote.RemoteActorRefProvider } remote { enabled-transports = [akka.remote.netty.tcp] netty.tcp { hostname = localhost port = 5151 } } } val sys = ActorSystem( boom, c ) val actor = sys.actorSelection(akka.tcp:// dockerexp@172.16.240.141:9100/user/dockerexp) println(Actor: +actor) implicit val timo = Timeout(5.seconds) try { println( Await.result( (actor ? hey).asInstanceOf[Future[String]], 15.seconds) ) } finally { println(Dying...) Thread.sleep(5000) sys.shutdown() } This isn't working anymore--just times out and dies. It did work several months ago when 2.4-SNAPSHOT was first available. Am I doing something wrong? Thanks, Greg -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] ReadPrefered in FlexiMerge (akka-streams)
Is if(input eq p.priority) also true? On Mon, Apr 13, 2015 at 7:11 PM, Johannes Plapp johannes.pl...@gmail.com wrote: Hi, While implementing a FlexiMerge we stumbled on the following issue: override def initialState = State[T](ReadPreferred(p.priority, p.second)) { (ctx, input, element) = if(input == p.priority).. // always true ctx.emit(element) SameState } Even if ReadPrefered returns an element from the second input, the returned input always equals the first one (p.priority). Is this intended behaviour? I also submitted a test covering this issue: https://github.com/akka/akka/issues/17157 Thanks, Johannes Plapp -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Is parent notified when child stops?
What does your observations show? :) On Tue, Apr 21, 2015 at 4:09 PM, Andrew Easter andrew.eas...@gmail.com wrote: Thanks for your reply. In my case, I'm not really interested in knowing exactly when the child stops. I simply want to know whether context.child would no longer return Some(ActorRef) once the child has stopped. Reason being that I'm essentially using context.child to lookup children by name, somewhat akin to a Map - using context.child seems to avoid explicitly having to create my own Map and waiting on Terminated messages to remove children as they stop. On Tuesday, 21 April 2015 12:28:36 UTC+1, Anders Båtstrand wrote: You have to watch the actor to receive the Terminated-message when it stops. context watch childActor As for the question about context.child, I don't know (never used that one myself) Regards, Anders tirsdag 21. april 2015 09.58.25 UTC+2 skrev Andrew Easter følgende: I've struggled to find any explicit documentation to state how a parent actor reacts to a child actor stopping itself. If a child actor calls: context stop self Will its parent be notified that the child has stopped such that a call within the parent to: context.child(name_of_stopped_child) will - once the child has completely stopped - eventually return None? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Can't Seem To Get Schedulers To Work
This works for me, must be your use of thunk-to-function parameters: scala object Concurrency { | import java.util.concurrent.Executors | import akka.dispatch.ExecutionContexts | import akka.actor._ | import scala.concurrent._ | import scala.concurrent.duration._ | object Log { def warn(s: String): Unit = System.err.println(s) } | private val system = ActorSystem(system, defaultExecutionContext = Some(ExecutionContexts.fromExecutor(Executors.newCachedThreadPool( | import system.dispatcher | private val scheduler = system.scheduler | Log.warn(starting up actor system) | Future { Log.warn(runNow works) } | scheduler.scheduleOnce(3.seconds)(Log.warn(scheduleOnce works)) | } defined object Concurrency scala Concurrency.toString starting up actor system runNow works res0: String = Concurrency$@417781bc scala scheduleOnce works On Sun, Apr 26, 2015 at 9:50 PM, Ian Nowland ian.nowl...@gmail.com wrote: I've been following various example code I've found online, but no matter how I rearrange it, I can't seem to get a simple scheduler example to work. This is my current code: object Concurrency extends Loggable { private val system = ActorSystem(system, defaultExecutionContext = Some(ExecutionContexts.fromExecutor(Executors.newCachedThreadPool( import system.dispatcher private val scheduler = system.scheduler Log.warn(starting up actor system) def runNow(f: = () = Unit):Unit = Future {f()} def scheduleOnce(duration:FiniteDuration, f: = () = Unit):Unit = { scheduler.scheduleOnce(duration)(() = runNow(f)) } runNow(() = Log.warn(runNow works)) scheduleOnce(FiniteDuration(3,TimeUnit.SECONDS),() = Log.warn(scheduleOnce works)) } When I run this, I see my runNow works go off, but not my scheduleOnce. What am I missing? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka with org.apache.log4j.MDC
Hi Anindita, It would be great for us to know what is lacking in the MDC sections of the Akka Logging documentation, please don't hesitate to contribute. Thanks! On Mon, Apr 27, 2015 at 8:09 AM, Anindita Ghatak anindita.g...@gmail.com wrote: Hi, How can I use Akka with org.apache.log4j.MDC ? Thanks Regards, Anindita -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: The best ways to resolve future inside an actor?
Stash and pipeTo is probably the best you can do today. The reason for things to be as they are is because it amounts to what's called a selective receive and it is typically a cause of poor performance during runtime (since it stalls the actor). If you have a more typical pipeline processing requirement you should definitely look into Akka Streams, with its mapAsync operation—with a more constrained target domain as stream processing is, it is possible to create more targeted solutions. Does that help? On Thu, May 14, 2015 at 7:06 PM, Andrew Gaydenko andrew.gayde...@gmail.com wrote: On Thursday, May 14, 2015 at 8:04:50 PM UTC+3, √ wrote: What if it never completes? Timeout is acceptable. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: The best ways to resolve future inside an actor?
What if it never completes? On Thu, May 14, 2015 at 7:03 PM, Andrew Gaydenko andrew.gayde...@gmail.com wrote: Sequence order isn't a problem. RondRobinPool(1) here isn't more rather just a limiting CPU consumption to a single core. As at the first message in the topic, the problem is to gracefully call function returning Future inside a message handler. We don't want answering to sender, don't want to serialize per se.. Just are looking for the best way to resolve a Future inside a receive/case message handler. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] mapConcat
scala.collection.immutable.Iterable seems like a good choice over Seq On Tue, May 12, 2015 at 1:38 PM, Endre Varga endre.va...@typesafe.com wrote: Hi Peter, On Tue, May 12, 2015 at 1:30 PM, Peter Schmitz petrischm...@gmail.com wrote: First of all: Impressive work, guys! Though I have some questions: Why does FlowOps.mapConcat expects a Seq and not a Traversable? (Say I want to pass a SortedSet.) Good question. I think it could be an Iterable (I would avoid Traversable though). I added a ticket. And concerning the Seq: Why restricted to scala.collection.immutable.Seq and not scala.collection.Seq? We always prefer immutable collections in our APIs In this case this is probably overkill, but at least it allows safe closing over of an existing collection in mapConcat from an external scope (since it is immutable). I don't think we will change it. -Endre -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Akka Streams Merge variance
Thinking about it perhaps it makes sense to have a MergeWith, where Merge is using the identity function? -- Cheers, √ On 19 May 2015 02:10, Andrey Kuznetsov f...@loathing.in wrote: I asked myself the same question when I faced to the problem of merging elements from Source[A] and Source[B] where A and B are children of the same trait C. Had to make both sources a Source[C]. On Saturday, May 16, 2015 at 4:47:51 PM UTC+3, Oliver Winks wrote: If you look at the implementation of Merge you find that it is a generic type, but without any defined variance: classMerge[T] extends Graph http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC2/akka/stream/Graph.html [UniformFanInShape http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC2/akka/stream/UniformFanInShape.html [T, T], Unit http://www.scala-lang.org/api/2.10.5/index.html#scala.Unit ] Is there any reason for this? Would it not make sense to define it as a covariant type? e.g: classMerge[+T] extends Graph http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC2/akka/stream/Graph.html [UniformFanInShape http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC2/akka/stream/UniformFanInShape.html [T, T], Unit http://www.scala-lang.org/api/2.10.5/index.html#scala.Unit ] -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-http is there an example of showing how to integrate with servlet container?
One integration you can do easily is to have a ServletContextListener that creates the Akka Http server endpoint when loaded and stops it on unload. On Tue, Apr 14, 2015 at 3:53 PM, Roland Kuhn goo...@rkuhn.info wrote: Hi Tomer, the servlet container way of running things completely negates all the benefits that Akka HTTP would give you, so there is no point in supporting this scenario—spray-servlet will not be ported to Akka HTTP. Sorry for the bad news, Roland 14 apr 2015 kl. 15:45 skrev Jas tomer...@gmail.com: is there an example showing how to integrate servlet container with akka-http? i'm currently using spray with serlvet with https://github.com/spray/spray/blob/master/spray-servlet/src/main/scala/spray/servlet/Servlet30ConnectorServlet.scala and was hoping to find an example showing how to perform this with akka-http.. thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. *Dr. Roland Kuhn* *Akka Tech Lead* Typesafe http://typesafe.com/ – Reactive apps on the JVM. twitter: @rolandkuhn http://twitter.com/#!/rolandkuhn -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] CPU spikes using fork-join-executor
I'd bet on this explanation too. Having looked at the code I think it should be possible to switch implementations to something without spinlocking. On Fri, Apr 17, 2015 at 8:20 AM, Roland Kuhn goo...@rkuhn.info wrote: Hi Akara, thanks for all the detailed information, I am 95% certain that I know what happens—and I’d call it a bug. To the casual observer the code in DirectByteBufferPool looks like a CAS loop, but it actually is an open-coded primitive spinlock. If you have more threads competing for this lock than you have CPUs it might well happen that the kernel decides to schedule out a thread that was holding this lock, and since it is not a normal, honest lock, the kernel has no idea why the other threads now all are spinning like mad. It probably takes some server-type hardware and a bit of patience and good monitoring to actually notice, which is why you are the first person to report it :-) I created a ticket https://github.com/akka/akka/issues/17216 to track this. Regards, Roland 17 apr 2015 kl. 05:20 skrev Akara Sucharitakul akara.sucharita...@gmail.com: Nope, this is actually Ubuntu 10.10 on a 4vCPU VM (likely ESX but I can't tell for sure). Couple more pieces of information: - Further experiments with parallelism-factor lowered to 1.0 no longer shows the spikes. - Trying to profile the spikes with JMC would cause the spikes not to happen. The sampling somehow changed the timing characteristics. - Switching the ForkJoinExecutor to Java8, the initial experiments show similar spikes. Since this was quick implementation of ExecutorServiceConfigurator to get Java 8 ForkJoinPool to work, there is still a bit of refinement we need to do before we can conclude on this fact. I'll keep updating this thread as we find out more. Expect it to be a long process, though. Thanks! -Akara On Thursday, April 16, 2015 at 5:03:48 AM UTC-7, Martynas Mickevičius wrote: Hi, are you running this application on Windows? There is a known situation https://github.com/akka/akka/issues/16942 where high CPU usage might occur when using TCP with akka IO. On Tue, Apr 14, 2015 at 7:28 AM, Akara Sucharitakul akara.suc...@gmail.com wrote: Scala 2.11.6, Akka 2.3.9, Spray 1.3.2. When testing a particular workload, we ran into CPU spikes. The avg CPU load is less than 5% but about 6-7 minutes into the run, we start to see CPU spiking to near 100% lasting for several seconds. This repeats itself every 6-7 minutes. We can't correlate this with a particular GC activity just yet. We tested against Java 7_45 but tried Java 8 without much change. Spray is configured on the default-dispatcher (Spray default). Trying to capture thread dumps at the time spikes happening reveal 9 threads spinning on the followings: default-akka.actor.default-dispatcher-11 prio=10 tid=0x01b2b800 nid=0x3752 runnable [0x7f1191d4] java.lang.Thread.State: RUNNABLE at akka.io.DirectByteBufferPool.takeBufferFromPool(DirectByteBufferPool.scala:41) at akka.io.DirectByteBufferPool.acquire(DirectByteBufferPool.scala:31) at akka.io.TcpConnection.doRead(TcpConnection.scala:231) at akka.io.TcpConnection$$anonfun$connected$1.applyOrElse(TcpConnection.scala:87) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.io.TcpConnection.aroundReceive(TcpConnection.scala:28) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2 threads show this spinning: default-akka.actor.default-dispatcher-8 prio=10 tid=0x01d6f000 nid=0x374f runnable [0x7f11928d2000] java.lang.Thread.State: RUNNABLE at akka.io.DirectByteBufferPool.offerBufferToPool(DirectByteBufferPool.scala:66) at akka.io.DirectByteBufferPool.release(DirectByteBufferPool.scala:34) at akka.io.TcpConnection.doRead(TcpConnection.scala:245) at akka.io.TcpConnection$$anonfun$connected$1.applyOrElse(TcpConnection.scala:87) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.io.TcpConnection.aroundReceive(TcpConnection.scala:28) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at
Re: [akka-user] How to restart a node?
The reason why it is not possible to rejoin is because other nodes have already acted upon the death of that node. Allowing the Undead in a cluster makes it really hard to reason about. On Sat, Apr 11, 2015 at 1:36 AM, Andrey Ilinykh ailin...@gmail.com wrote: Thank you! It works. But still there is a slim chance something goes wrong (for example java process crashed). What a reason not to allow the same actor system join multiple times? As far as I understand each actor system has internal UUID which is generated every time you start akka. If some how this UUID persists between restarts will akka cluster allow to join multiple times? Thank you, Andrey On Friday, April 10, 2015 at 2:29:41 AM UTC-7, Akka Team wrote: Hi, Even if you turn off auto-downing you still have the programmatic API: Cluster(system).down(address) So if you have a logic for downing that can be automatized (i.e. expressed in Scala) the you can implement it by using the above API. But if I understood correctly, you want to stop a node gracefully before starting up it again with new code. In this case ( http://doc.akka.io/docs/akka/2.3.9/scala/cluster-usage.html#Leaving): A more graceful exit can be performed if you tell the cluster that a node shall leave. This can be performed using *JMX* http://doc.akka.io/docs/akka/2.3.9/scala/cluster-usage.html#cluster-jmx-scala or*Command Line Management* http://doc.akka.io/docs/akka/2.3.9/scala/cluster-usage.html#cluster-command-line-scala. It can also be performed programatically with Cluster(system).leave( address). Note that this command can be issued to any member in the cluster, not necessarily the one that is leaving. The cluster extension, but not the actor system or JVM, of the leaving member will be shutdown after the leader has changed status of the member to Exiting. Thereafter the member will be removed from the cluster. Normally this is handled automatically, but in case of network failures during this process it might still be necessary to set the node’s status toDown in order to complete the removal. So if you have an API that can tell the node to restart, you can just - leave the cluster manually - after successfully leaving or timing out, shut down the actor system -Endre On Thu, Apr 9, 2015 at 8:52 PM, Andrey Ilinykh aili...@gmail.com wrote: Hello everybody! I have a simple cluster with disabled auto downing. (I don't want a cluster to be partitioned eventually). As result every time I deploy new code node becomes unreachable. I have to down it manually. Which is annoying even you have two nodes. For big cluster it become nightmare. So, my question is - how to restart node easily. Or what is the best practice to deploy new code? Thank you, Andrey -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Refer to the same cluster-wide router from several actors in the cluster on the same node
actorOf creates a new actor, and in this case you're doing it at the system level from within another actor: val httpWorkers = context.system.actorOf(FromConfig.props(Props.empty), cluster_router_httpworker) That's the problem. On Sun, Apr 5, 2015 at 6:54 PM, Eugene Dzhurinsky jdeve...@gmail.com wrote: Hello! I have a cluster configuration like this: /cluster_router_httpworker { router = consistent-hashing-group nr-of-instances = 10 routees.paths = [ /user/router_httpworker ] cluster { enabled = on max-nr-of-instances-per-node = 5 allow-local-routees = off use-role = http } }, /cluster_router_chunkworker { router = consistent-hashing-group nr-of-instances = 5 routees.paths = [/user/router_chunkworker ] cluster { enabled = on max-nr-of-instances-per-node = 1 allow-local-routees = off use-role = chunk } } Now I start a *TaskChunkActor* the as below: val sys = ActorSystem(HttpCluster, config) val ref = sys.actorOf(Props[TaskChunkActor].withRouter(RoundRobinPool(10)), router_chunkworker) val clusterRef = sys.actorOf(FromConfig.props(Props.empty), cluster_router_chunkworker) The *TaskChunkActor* initializes its internal references to the *HttpWorker* : val httpWorkers = context.system.actorOf(FromConfig.props(Props.empty), cluster_router_httpworker) At this point I'm getting the exception: akka.actor.InvalidActorNameException: actor name [cluster_router_httpworker] is not unique! at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130) at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76) at akka.actor.ActorCell.reserveChild(ActorCell.scala:369) at akka.actor.dungeon.Children$class.makeChild(Children.scala:201) at akka.actor.dungeon.Children$class.attachChild(Children.scala:41) at akka.actor.ActorCell.attachChild(ActorCell.scala:369) at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553) It seems that every *TaskChunkActor* tries to create it's own reference to the cluster-related actor, and fails. I see that I could pass the route to every instance of *TaskChunkActor* via constructor, but perhaps there's a way to either create an instance of the actor from config or return the existing one? Thanks! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Best practices for selecting or creating actor
val ref = context.child(name).getOrElse(context.actorOf(props, name)) On Sun, Apr 5, 2015 at 11:53 PM, Adam adamho...@gmail.com wrote: First of all, you shouldn't ever block like this, as you do with Await. As for your question - this sounds like something the parent actor should be responsible for. I'm not even sure the code above works (it at least never occurred to me to try to create an actor using a full path as I always understood the docs as if it shouldn't be possible). I think allowing the parent actor to determine if one of his children exists or not is cleaner. It also has direct access to this through the actor context, so the answer is immediate. On Sunday, April 5, 2015 at 10:50:28 PM UTC+3, Fatih Dönmez wrote: Hi, I want to create an actor if it didn't created already. To check its existence I use actorSelection. If there is no actor defined by the path, I create a new one with actorOf. Is this approach correct? How can I improve the design? Here is the code I come up with currently; val fullPath: String = path.format(appId) var starter: ActorRef = null implicit val timeout = Timeout(5 seconds) try { starter = Await.result(context.actorSelection(fullPath). resolveOne,FiniteDuration(5,TimeUnit.SECONDS)) } catch { case e: ActorNotFound = { starter = context.actorOf(Props(new StarterNode(customer, appId)), name = fullPath) logger.info(Actor [ + fullPath + ] creating for first time) } case e: Exception = logger.error(Actor [ + fullPath + ] selection failed,e) } //TODO if(starter != null) { starter ! event } else { logger.warn(Starter node failed with timeout) } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Java : How should Futures be handled when Unit testing?
Does this help? http://christopher-batey.blogspot.se/2014/02/testing-scala-futures-with-scalatest-20.html On Thu, Apr 9, 2015 at 1:27 PM, Adam Daines dainesa...@gmail.com wrote: Hi all, I've got a question in relation to the unit testing of a piece of non actor code that produces a FutureActorRef via performing an actorSelection().resolveOne on the ActorSystem. Within the Future.onFailure() a value is being set that I would like to test but the Future is making use of the ActorSystem.dispatcher() which is therefore causing asynchronicity issues when attempting to Assert that the value has been set as expected. I am currently performing a Thread.sleep() within the Unit test to provide enough time for the value to have been set but this is far from ideal! What is the best way to go about testing this type of code without having to sleep and wait? Is it possible to override the dispatcher that the ActorSystem returns with the CallingThreadDispatcher? Thanks. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] 2-way communication between Akka Streams and (remote) actors
You could have a mapAsync and use ask on the remote ActorRef? On Thu, Apr 9, 2015 at 8:49 PM, Robin Green gree...@gmail.com wrote: Closely related to this question https://groups.google.com/forum/#!searchin/akka-user/streams$20actors/akka-user/AxVI8T_laYg/Qsi3BfCzQMwJ, I know that Akka Streams do not yet support remote materialisation, but what if you want to have Akka Streams handle a local graph of stream processing, and have some intermediate processing work done by existing (possibly remote) actors? The way I would approach it is to have an ActorSubscriber as a sink to talk to each actor and an ActorPublisher as a source to receive results for each actor, but this is likely to lead to multiple flows or graphs that are disconnected from each other. Is there any more elegant way to approach it? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Java : How should Futures be handled when Unit testing?
Hi Adam, I generally recommend against using onComplete, onSuccess and onFailure for that reason, instead use map/flatMap/andThen/recover/recoverWith/transform On Thu, Apr 9, 2015 at 4:14 PM, Adam Daines dainesa...@gmail.com wrote: Hi again, At first it had appeared that the issue was fully resolved but the test I had written began to fail again intermittently. It appears that taking the approach Endre suggested does not quite work as I had expected. It appears that the Await triggers and unblocks at the moment that the Future gets completed which is before the .onSuccess .onFailure have actually run. In my use case the variable (which is a deferredResult) is actually set inside the .onFailure, therefore I cannot do an assertion on it until after the .onFailure has finished running. Any further assistance would be much appreciated. Thanks. On Thursday, April 9, 2015 at 1:46:22 PM UTC+1, Adam Daines wrote: Thank you both for the quick and helpful responses! Our issue is now resolved :) Thanks. On Thursday, April 9, 2015 at 1:01:24 PM UTC+1, √ wrote: Ouch, nice catch, now we've covered both the question and the alternative (Scala) :) On Thu, Apr 9, 2015 at 1:59 PM, Akka Team akka.o...@gmail.com wrote: On Thu, Apr 9, 2015 at 1:33 PM, Viktor Klang viktor...@gmail.com wrote: Does this help? http://christopher-batey.blogspot.se/2014/02/testing- scala-futures-with-scalatest-20.html That is for scala though. We usually use scala.concurrent.Await which can be used like this assert(Await.result(myFuture, timeoutDuration) == xyz) This will block the test and wait until the future becomes ready. If the future completes then the assertion will run. If the future fails, then Await.result() will throw the exception. If the future does not complete during the time defined in timeoutDuration, then it will throw A TimeoutException. -Endre On Thu, Apr 9, 2015 at 1:27 PM, Adam Daines daine...@gmail.com wrote: Hi all, I've got a question in relation to the unit testing of a piece of non actor code that produces a FutureActorRef via performing an actorSelection().resolveOne on the ActorSystem. Within the Future.onFailure() a value is being set that I would like to test but the Future is making use of the ActorSystem.dispatcher() which is therefore causing asynchronicity issues when attempting to Assert that the value has been set as expected. I am currently performing a Thread.sleep() within the Unit test to provide enough time for the value to have been set but this is far from ideal! What is the best way to go about testing this type of code without having to sleep and wait? Is it possible to override the dispatcher that the ActorSystem returns with the CallingThreadDispatcher? Thanks. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User
Re: [akka-user] akka-streams - How to define a Source from an arbitrary event stream?
pushRecursively() onComplete { case Failure(ex) = onError(ex) context.stop(self) case s = } that's not safe. On Thu, Apr 9, 2015 at 1:16 PM, Jakub Liska liska.ja...@gmail.com wrote: I do it just via ActorPublisher, the scroll method is basically asynchronously loading elasticsearch records (classic cursor thingy). It's a combination of request demand and asynchronous source of events : def receive: Receive = { case Request(n) if totalDemand 0 n 0 isActive = def pushRecursively(n: Long = Math.min(n, totalDemand), scrollId: String = lastScrollId): Future[Unit] = { scroll(scrollId) flatMap { case (sid, recs) if recs.isEmpty = // empty hits means end of scanning/scrolling onComplete() context.stop(self) Future.successful(()) case (sid, recs) = lastScrollId = sid val contexts = recs.map { case (recId, rec) = EsResource :: Map.empty[String, String] :: recId :: rec :: HNil } onNext(contexts) if (n 1) pushRecursively(n-1, sid) else Future.successful(()) } } pushRecursively() onComplete { case Failure(ex) = onError(ex) context.stop(self) case s = } case Cancel = context.stop(self) } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Java : How should Futures be handled when Unit testing?
Ouch, nice catch, now we've covered both the question and the alternative (Scala) :) On Thu, Apr 9, 2015 at 1:59 PM, Akka Team akka.offic...@gmail.com wrote: On Thu, Apr 9, 2015 at 1:33 PM, Viktor Klang viktor.kl...@gmail.com wrote: Does this help? http://christopher-batey.blogspot.se/2014/02/testing-scala-futures-with-scalatest-20.html That is for scala though. We usually use scala.concurrent.Await which can be used like this assert(Await.result(myFuture, timeoutDuration) == xyz) This will block the test and wait until the future becomes ready. If the future completes then the assertion will run. If the future fails, then Await.result() will throw the exception. If the future does not complete during the time defined in timeoutDuration, then it will throw A TimeoutException. -Endre On Thu, Apr 9, 2015 at 1:27 PM, Adam Daines dainesa...@gmail.com wrote: Hi all, I've got a question in relation to the unit testing of a piece of non actor code that produces a FutureActorRef via performing an actorSelection().resolveOne on the ActorSystem. Within the Future.onFailure() a value is being set that I would like to test but the Future is making use of the ActorSystem.dispatcher() which is therefore causing asynchronicity issues when attempting to Assert that the value has been set as expected. I am currently performing a Thread.sleep() within the Unit test to provide enough time for the value to have been set but this is far from ideal! What is the best way to go about testing this type of code without having to sleep and wait? Is it possible to override the dispatcher that the ActorSystem returns with the CallingThreadDispatcher? Thanks. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: akka-streams: FlexiRoute - route specific elements to specific outlets
What will you do if you get B's but only demand for As? -- Cheers, √ On 9 Apr 2015 19:10, Andrey Kuznetsov f...@loathing.in wrote: If I change State condition to DemandFromAll, stage will wait for all outlets to start demanding but I need elements to be emitted as soon as possible. On Thursday, April 9, 2015 at 7:58:45 PM UTC+3, Andrey Kuznetsov wrote: I need to create a stage with one inlet accepting elements of type Message and three outlets emitting elements of types A, B, C, all of which are Message subtypes. Depending on which element came (A, B or C) it should be emitted on one of three outlets. I am wondering how is it possible to achieve? If I create a State with DemandFromAny(all three outlets), I have no guarantee that element passed in State's onInput is an instance of type that demanding outlets should emit. Here is FlexiRoute example (I simplified it to Message, A, B and C types to make in my app-context independent). It works only when it is so lucky that element matches type of the demanding outlet. sealed trait Message class A extends Message class B extends Message class C extends Message class MessageDiscriminatorShape(_init: Init[Message] = Name[Message](MessageDiscriminator)) extends FanOutShape[Message](_init) { val outA = newOutlet[A](outA) val outB = newOutlet[B](outB) val outC = newOutlet[C](outC) protected override def construct(i: Init[Message]) = new MessageDiscriminatorShape(i) } class MessageDiscriminator extends FlexiRoute[Message, MessageDiscriminatorShape]( new MessageDiscriminatorShape, OperationAttributes.name(MessageDiscriminator)) { import FlexiRoute._ override def createRouteLogic(p: PortT) = new RouteLogic[Message] { override def initialState = State[Any](DemandFromAny(p.outlets)) { (ctx, _, element) = element match { case e: A = ctx.emit(p.outA)(e) case e: B = ctx.emit(p.outB)(e) case e: C = ctx.emit(p.outC)(e) } SameState } override def initialCompletionHandling = eagerClose } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Flow supervision decider
From my PoV: It is vital to distinguish stream fatal errors from transient element processing error, the first terminates the stream abruptly, the second should be modeled within the processing domain, by transmitting things like Try[T] as elements for instance. On Wed, May 20, 2015 at 10:06 PM, Patrik Nordwall patrik.nordw...@gmail.com wrote: One thing to remember is that an upstream failure will be propagated downstream immediately without backpressure and thereby overtake previously emitted (buffered) elements, and transforming such an error to an element further downstream may result in unexpected order of elements. Another thing is that such a failure will cancel upstream and that will be difficult to coordinate with a (later) downstream recovery. It is sure possible to implement for a specific stage, but then it is perhaps confusing that it is only catching errors from the preceding stage. This is just my 2c, so if you want a real assessment you are welcome to create a github issue. /Patrik 20 maj 2015 kl. 18:24 skrev dpratt david.pr...@gmail.com: Sorry - hit send too soon. foo.recover { case (NonFatal(e), failedValue) = log.error(e, Problem processing stream value of {}, failedValue) UNKNOWN VALUE } On Wednesday, May 20, 2015 at 11:23:02 AM UTC-5, dpratt wrote: What if I have an existing stage/Flow that I do not have control over, or where it would not make sense to conflate the flow logic with the exception handling? For example val foo: Flow[String, String, Unit] = SomeLibrary.somethingThatGeneratesAFlow() how would I wrap foo with error handling? I can't use map or mapAsync, since those are compositional - namely, the value to map has already been calculated. What I really want is a recover block on the flow itself - something like foo.recover { } On Wednesday, May 20, 2015 at 4:37:37 AM UTC-5, Patrik Nordwall wrote: I think we considered adding this to the stream supervision mechanism, but since it is not possible to express the types of the elements there in any sane way we decided to not do it. Instead we said that this specific recover scenario should be handled with try-catch within the function/stage of yours. For mapAsync you can use recover on the Future. By the way, you can define the supervision for individual stages by using the withAttributes. Regards, Patrik On Fri, May 15, 2015 at 7:50 PM, dpratt david...@gmail.com wrote: I've been using the Streams API to build a few things for the past couple months, and I have a humble suggestion for an API enhancement. I'm not sure if this is even possible to do given the contract of how a Flow operates, adding a method to FlowOps with the following signature would be quite useful - def recover(f: PartialFunction[(In, Throwable), Out]): Repr[Out, Mat] It's likely due to the fact that I have yet to fully internalize the Flow API, but I've found that the supervision functionality isn't exactly what I need. On the top-level, it makes complete sense, but there is no way to deal with an error in a stream and not have at least one message silently dropped. It would be nice to be able to set up more fine-grained error handling. As an example, imagine a stream that was processing incoming deltas to a set of records held either in memory or some persistent data store. A failure of a given delta should not necessarily shut down the whole pipeline, but the associated record should be marked as inconsistent and dealt with appropriately. Using the current supervision API, there's no way to determine the actual element that caused the failure, and thus there's no real way to handle it or signal an external system with the details of the error. Of course, you can work around this by making the stream operate on a Try[T] instead of T, but that just seems unwieldy. Am I looking at this the wrong way? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to
Re: [akka-user] Failure in inner stream not reflecting in the outer stream
My point is that if there are multiple elements in the substream then there could be multiple failures, multiple successes or a combination thereof. How would you represent that in a single result? On Fri, Jun 5, 2015 at 12:27 PM, Prog programad...@gmail.com wrote: I would like to have the same failure as I get in the inner onComplete akka.http.scaladsl.model.EntityStreamException: Entity stream truncation As the stream was not complete I think there is no reason the outer onComplete finish with success. On Friday, June 5, 2015 at 12:22:10 PM UTC+2, √ wrote: What behavior would you want when there are multiple elements in the first source? What would the value in the outer onComplete be? On Fri, Jun 5, 2015 at 12:17 PM, Prog progra...@gmail.com wrote: Hi, I have the code bellow and I have tried many different variations of it with mapAsync in the place of forEach and goes on, but for some reason when I break the communication on the server side in the middle of the transfer I get the error in the inside Sink.onComplete, but the error it is not reflected to the outer Sink.onComplete. So I get in the console: in error --- out success I would like to have a Failure in both Sink.onComplete. How can I achieve that? val conn: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection(host) val finish = Source.single(HttpRequest(uri = uri)).via(conn).runForeach({ response = if (response.status == StatusCodes.OK) { response.entity.dataBytes.map { data = println(data) }.runWith(Sink.onComplete({ case Success(resp) = { println( in success) } case Failure(error) = { println( in failure) } case _ = { println( in error) } })) } else { println(status+ response.status)} }) finish.onComplete({ case Success(resp) = { println( out succes: + resp) } case Failure(error) = { println( out failure) } case _ = { println( out error) } }) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Failure in inner stream not reflecting in the outer stream
If you want to break the outer stream on the first failure of the inner stream then I recommend using mapAsync and have the inner stream return a future in the inner stream. On Fri, Jun 5, 2015 at 2:39 PM, Prog programad...@gmail.com wrote: My point is imagine you have a large file (1GB) received by that stream (I don't know if the file come as a chunked message or not ), even if the file was not complete you get the response in the outer stream that it was successful . So I could not send the outer future to some other function to do some tasks if the file was downloaded successful or truncated by failure, as it would send a false positive. Is there any way I could aggregate the futures in the inner stream and break the outer stream in case of failure, as I have in this case a dependency relation between the inner and outer streams? On Friday, June 5, 2015 at 1:42:05 PM UTC+2, √ wrote: My point is that if there are multiple elements in the substream then there could be multiple failures, multiple successes or a combination thereof. How would you represent that in a single result? On Fri, Jun 5, 2015 at 12:27 PM, Prog progra...@gmail.com wrote: I would like to have the same failure as I get in the inner onComplete akka.http.scaladsl.model.EntityStreamException: Entity stream truncation As the stream was not complete I think there is no reason the outer onComplete finish with success. On Friday, June 5, 2015 at 12:22:10 PM UTC+2, √ wrote: What behavior would you want when there are multiple elements in the first source? What would the value in the outer onComplete be? On Fri, Jun 5, 2015 at 12:17 PM, Prog progra...@gmail.com wrote: Hi, I have the code bellow and I have tried many different variations of it with mapAsync in the place of forEach and goes on, but for some reason when I break the communication on the server side in the middle of the transfer I get the error in the inside Sink.onComplete, but the error it is not reflected to the outer Sink.onComplete. So I get in the console: in error --- out success I would like to have a Failure in both Sink.onComplete. How can I achieve that? val conn: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection(host) val finish = Source.single(HttpRequest(uri = uri)).via(conn).runForeach({ response = if (response.status == StatusCodes.OK) { response.entity.dataBytes.map { data = println(data) }.runWith(Sink.onComplete({ case Success(resp) = { println( in success) } case Failure(error) = { println( in failure) } case _ = { println( in error) } })) } else { println(status+ response.status)} }) finish.onComplete({ case Success(resp) = { println( out succes: + resp) } case Failure(error) = { println( out failure) } case _ = { println( out error) } }) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
Re: [akka-user] Re: [akka-hhtp][akka-streams] Process http request with flow
Hi, I don't understand what you mean. What does Does not solve my problem mean in this case? What -is- the result of the flow? On Tue, Jun 9, 2015 at 9:30 AM, Владимир Морозов greenhos...@gmail.com wrote: Hi, Oh... but if I change Sink.ignore to Sink.head[String] (for example) it not solve my problem, because ref (my stream) have type ActorRef. What type of Sink I need to use, for obtain result of flow, if as source used actorPublisher? вторник, 9 июня 2015 г., 10:23:41 UTC+3 пользователь √ написал: The results will travel to the Sink, so you'll have to not `ignore` the result. On Tue, Jun 9, 2015 at 9:15 AM, Владимир Морозов green...@gmail.com wrote: Hi, I play more with actorPublisher, yes it is great thing, but - when I have stream like this: val jobManagerSource = Source.actorPublisher[UserRegisterSource. RegisterUser](UserRegisterSource.props) val ref = Flow[UserRegisterSource.RegisterUser] .mapAsync(1)(callMe) .to(Sink.ignore) .runWith(jobManagerSource) ref ? UserRegisterSource.RegisterUser(test) And then ask it, like in previous message, in ask result I see what jobManagerSource answer me. I can't understand how I can send some value to actorPublisher source and then obtain result of flow execution понедельник, 8 июня 2015 г., 1:10:01 UTC+3 пользователь Владимир Морозов написал: I solve this problem, but found new one: val ref = Flow[UserRegisterSource.RegisterUser] .mapAsync(1)(callMe) .to(Sink.ignore) .runWith(jobManagerSource) val t = (ref ask UserRegisterSource.RegisterUser(test)).mapTo[UserRegisterSource.RegisterUser] but t never complete, always end with Timeout exception воскресенье, 7 июня 2015 г., 23:13:29 UTC+3 пользователь Владимир Морозов написал: Hi, I try but get error message: java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0 My code is: val jobManagerSource = Source.actorPublisher[UserRegisterSource.RegisterUser](UserRegisterSource.props) val ref = Flow[UserRegisterSource.RegisterUser] .mapAsync(1)(callMe) .to(Sink.ignore) .runWith(jobManagerSource) ref ! UserRegisterSource.RegisterUser(test) class UserRegisterSource extends ActorPublisher[UserRegisterSource.RegisterUser] { import akka.stream.actor.ActorPublisherMessage._ import UserRegisterSource._ val MaxBufferSize = 100 var buf = Vector.empty[RegisterUser] override def receive: Receive = { case request: RegisterUser = onNext(request) case Cancel = context.stop(self) } } воскресенье, 7 июня 2015 г., 14:53:15 UTC+3 пользователь Akka Team написал: Hi, On Fri, Jun 5, 2015 at 7:53 PM, Владимир Морозов green...@gmail.com wrote: Yes, I know about mapAsync, but my problem with Source. A want to use single stream for processing some group of events I am not sure if I understand your question properly, but if you want to run the stream *once* and then reuse the stream for all the requests, then you can try to create an ActorPublisher as your Source, then run the stream outside of the Http handler logic. You can then use ask on that actor in the Http route. -Endre пятница, 5 июня 2015 г., 20:29:16 UTC+3 пользователь Paul Kinsky написал: Use mapAsync: `def mapAsync[T](parallelism: Int)(f: (Out) ⇒ Future[T]): Repr[T, Mat]`. On Friday, June 5, 2015 at 9:23:24 AM UTC-7, Владимир Морозов wrote: Hi all, I have some simple application based on akka streams and http: My Flow items: val resultSink = Sink.head[String] val fl0 = Flow[String].map(_.toInt) val fl2 = Flow[Int].map{ case value = Thread.sleep(1) value.toString } val fl3 = Flow[String].mapAsync(1)(callMe) Part of my akka-http route: val route = get { path(test) { path(register) { parameter('username) { case username: Username = Source.single(username).via(fl0).via(fl2).via(fl3).runWith(resultSink): Future[String] } } } I want to use one flow : Source.single(username). via(fl0).via(fl2).via(fl3).runWith(resultSink) for processing all requests. I want this for use streams back-pressure. But I can't figure out how create Source that can accept values like call def proccess( username: String): Future[String] but with back-pressure, because, for example, a want long-call DB in fl2 step, and DB can't accept more than 1 request per time (only for example) With best regards, Vladimir. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to
Re: [akka-user] Re: [akka-hhtp][akka-streams] Process http request with flow
The results will travel to the Sink, so you'll have to not `ignore` the result. On Tue, Jun 9, 2015 at 9:15 AM, Владимир Морозов greenhos...@gmail.com wrote: Hi, I play more with actorPublisher, yes it is great thing, but - when I have stream like this: val jobManagerSource = Source.actorPublisher[UserRegisterSource. RegisterUser](UserRegisterSource.props) val ref = Flow[UserRegisterSource.RegisterUser] .mapAsync(1)(callMe) .to(Sink.ignore) .runWith(jobManagerSource) ref ? UserRegisterSource.RegisterUser(test) And then ask it, like in previous message, in ask result I see what jobManagerSource answer me. I can't understand how I can send some value to actorPublisher source and then obtain result of flow execution понедельник, 8 июня 2015 г., 1:10:01 UTC+3 пользователь Владимир Морозов написал: I solve this problem, but found new one: val ref = Flow[UserRegisterSource.RegisterUser] .mapAsync(1)(callMe) .to(Sink.ignore) .runWith(jobManagerSource) val t = (ref ask UserRegisterSource.RegisterUser(test)).mapTo[UserRegisterSource.RegisterUser] but t never complete, always end with Timeout exception воскресенье, 7 июня 2015 г., 23:13:29 UTC+3 пользователь Владимир Морозов написал: Hi, I try but get error message: java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0 My code is: val jobManagerSource = Source.actorPublisher[UserRegisterSource.RegisterUser](UserRegisterSource.props) val ref = Flow[UserRegisterSource.RegisterUser] .mapAsync(1)(callMe) .to(Sink.ignore) .runWith(jobManagerSource) ref ! UserRegisterSource.RegisterUser(test) class UserRegisterSource extends ActorPublisher[UserRegisterSource.RegisterUser] { import akka.stream.actor.ActorPublisherMessage._ import UserRegisterSource._ val MaxBufferSize = 100 var buf = Vector.empty[RegisterUser] override def receive: Receive = { case request: RegisterUser = onNext(request) case Cancel = context.stop(self) } } воскресенье, 7 июня 2015 г., 14:53:15 UTC+3 пользователь Akka Team написал: Hi, On Fri, Jun 5, 2015 at 7:53 PM, Владимир Морозов green...@gmail.com wrote: Yes, I know about mapAsync, but my problem with Source. A want to use single stream for processing some group of events I am not sure if I understand your question properly, but if you want to run the stream *once* and then reuse the stream for all the requests, then you can try to create an ActorPublisher as your Source, then run the stream outside of the Http handler logic. You can then use ask on that actor in the Http route. -Endre пятница, 5 июня 2015 г., 20:29:16 UTC+3 пользователь Paul Kinsky написал: Use mapAsync: `def mapAsync[T](parallelism: Int)(f: (Out) ⇒ Future[T]): Repr[T, Mat]`. On Friday, June 5, 2015 at 9:23:24 AM UTC-7, Владимир Морозов wrote: Hi all, I have some simple application based on akka streams and http: My Flow items: val resultSink = Sink.head[String] val fl0 = Flow[String].map(_.toInt) val fl2 = Flow[Int].map{ case value = Thread.sleep(1) value.toString } val fl3 = Flow[String].mapAsync(1)(callMe) Part of my akka-http route: val route = get { path(test) { path(register) { parameter('username) { case username: Username = Source.single(username).via(fl0).via(fl2).via(fl3).runWith(resultSink): Future[String] } } } I want to use one flow : Source.single(username). via(fl0).via(fl2).via(fl3).runWith(resultSink) for processing all requests. I want this for use streams back-pressure. But I can't figure out how create Source that can accept values like call def proccess( username: String): Future[String] but with back-pressure, because, for example, a want long-call DB in fl2 step, and DB can't accept more than 1 request per time (only for example) With best regards, Vladimir. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List
Re: [akka-user] Failure in inner stream not reflecting in the outer stream
What behavior would you want when there are multiple elements in the first source? What would the value in the outer onComplete be? On Fri, Jun 5, 2015 at 12:17 PM, Prog programad...@gmail.com wrote: Hi, I have the code bellow and I have tried many different variations of it with mapAsync in the place of forEach and goes on, but for some reason when I break the communication on the server side in the middle of the transfer I get the error in the inside Sink.onComplete, but the error it is not reflected to the outer Sink.onComplete. So I get in the console: in error --- out success I would like to have a Failure in both Sink.onComplete. How can I achieve that? val conn: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection(host) val finish = Source.single(HttpRequest(uri = uri)).via(conn).runForeach({ response = if (response.status == StatusCodes.OK) { response.entity.dataBytes.map { data = println(data) }.runWith(Sink.onComplete({ case Success(resp) = { println( in success) } case Failure(error) = { println( in failure) } case _ = { println( in error) } })) } else { println(status+ response.status)} }) finish.onComplete({ case Success(resp) = { println( out succes: + resp) } case Failure(error) = { println( out failure) } case _ = { println( out error) } }) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] How does Akka implement sender and parent (internally)?
Hi Andriy, this is how sender is captured (implicits): https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Actor.scala#L447 this is how context is injected: https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Actor.scala#L428 On Thu, Jun 4, 2015 at 4:03 PM, Andriy Drozdyuk dro...@gmail.com wrote: I'm trying to hack together the sender functionality for pykka (in python) --- but I can't figure out how Akka does it. When I look at source for akka ActorRef, all I see is special `no sender` value set everywhere. Can anyone explain to me how sender is acquired? Furthermore, I would also like to have the parent reference. In akka I see when actor is created, parent is not passed to it in constructor. What is the magic here that allows actors to do `.parent`? Much appreciated if anyone knows about these internals. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Failure in inner stream not reflecting in the outer stream
The parameter to mapAsync is how many currently in progress are allowed. I'd probably write it as: val finish = source.via(connectionFlow).mapAsync(1){ case response if response.status == StatusCodes.OK = response.entity.dataBytes.map(println).runForeach(identity) case _ = Future.failed(new IllegalArgumentException(wrong)) }.runForeach(identity) On Fri, Jun 5, 2015 at 4:36 PM, Prog programad...@gmail.com wrote: I have done this code which seems to work, is this the right way to do what you said? How the parameter of the mapAsync influence the process of it? val finish = source.via(connectionFlow).mapAsync(1){ response = if (response.status == StatusCodes.OK) { val futureInner: Future[Unit] = response.entity.dataBytes.map( data = println(data) ).runForeach(r = r) futureInner } else{ Future.failed(new IllegalArgumentException(wrong)) } }.runForeach({resp = resp}) finish.onComplete({ case Success(resp) = { println( out succes: + resp) } case Failure(error) = { println( out failure) } case _ = { println( out error) } }) On Friday, June 5, 2015 at 2:49:28 PM UTC+2, √ wrote: If you want to break the outer stream on the first failure of the inner stream then I recommend using mapAsync and have the inner stream return a future in the inner stream. On Fri, Jun 5, 2015 at 2:39 PM, Prog progra...@gmail.com wrote: My point is imagine you have a large file (1GB) received by that stream (I don't know if the file come as a chunked message or not ), even if the file was not complete you get the response in the outer stream that it was successful . So I could not send the outer future to some other function to do some tasks if the file was downloaded successful or truncated by failure, as it would send a false positive. Is there any way I could aggregate the futures in the inner stream and break the outer stream in case of failure, as I have in this case a dependency relation between the inner and outer streams? On Friday, June 5, 2015 at 1:42:05 PM UTC+2, √ wrote: My point is that if there are multiple elements in the substream then there could be multiple failures, multiple successes or a combination thereof. How would you represent that in a single result? On Fri, Jun 5, 2015 at 12:27 PM, Prog progra...@gmail.com wrote: I would like to have the same failure as I get in the inner onComplete akka.http.scaladsl.model.EntityStreamException: Entity stream truncation As the stream was not complete I think there is no reason the outer onComplete finish with success. On Friday, June 5, 2015 at 12:22:10 PM UTC+2, √ wrote: What behavior would you want when there are multiple elements in the first source? What would the value in the outer onComplete be? On Fri, Jun 5, 2015 at 12:17 PM, Prog progra...@gmail.com wrote: Hi, I have the code bellow and I have tried many different variations of it with mapAsync in the place of forEach and goes on, but for some reason when I break the communication on the server side in the middle of the transfer I get the error in the inside Sink.onComplete, but the error it is not reflected to the outer Sink.onComplete. So I get in the console: in error --- out success I would like to have a Failure in both Sink.onComplete. How can I achieve that? val conn: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection(host) val finish = Source.single(HttpRequest(uri = uri)).via(conn).runForeach({ response = if (response.status == StatusCodes.OK) { response.entity.dataBytes.map { data = println(data) }.runWith(Sink.onComplete({ case Success(resp) = { println( in success) } case Failure(error) = { println( in failure) } case _ = { println( in error) } })) } else { println(status+ response.status)} }) finish.onComplete({ case Success(resp) = { println( out succes: + resp) } case Failure(error) = { println( out failure) } case _ = { println( out error) } }) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives:
Re: [akka-user] Re: Akka and DDD - Business Space to Solution Space for Bounded Context
Right, in a way Bounded Contexts is a mitigation against LCD. On Sat, Jun 6, 2015 at 7:47 PM, Greg Young gregoryyou...@gmail.com wrote: Bounded Contexts are more about understanding that the word Order means two very different things when I talk to OrderManagement vs Shipping. If I try to use one single language for everything the language often becomes polluted in terms and we end up with supersets of concepts. You can host Bounded Contexts separately but this is a technical decision. Bounded Contexts exist at a mental model level On Friday, June 5, 2015 at 2:21:36 PM UTC+3, wonderful world wrote: In DDD, the Bounded Contexts separate some business concerns. For example, OrderManagement and Shipping are BCs. In a pure DDD implementation, they can be implemented as separate assemblies or libraries. They may contain an entity called Product which are different in structure and behavior. These BCs interact via a Bus or through polling. When those Bounded Context are implemented with Akka, should those be in two different ActorSystems so they don't have access to each other but communicate via Bus? What is the pattern to be used? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] logback turboFilter and akka LoggingReceive incompatible?
Why deduplicate non-logged events at all? Doing the substitution but not incurring the IO cost must be worth it? -- Cheers, √ On 22 Jun 2015 11:09, Roland Kuhn goo...@rkuhn.info wrote: It depends on what you want to achieve: not doing the substitution for non-logged events is an explicit goal in our infrastructure. Regards, Roland 22 jun 2015 kl. 11:02 skrev Viktor Klang viktor.kl...@gmail.com: Doing the filtering pre-substitution seems like a bug. -- Cheers, √ On 22 Jun 2015 01:54, Sam Halliday sam.halli...@gmail.com wrote: Patrik, Thanks for investigating! You saved me a few hours off my Monday as I was going to go through this in detail and put together a minimal test case :-) Unfortunately, your conclusion seems to be pretty damning. It might just be this one turbofilter that is incompatible with akka, but its likely there are more. Can you think of any workarounds, other that doing duplicate filtering in the application tier? (eek!) I could investigate writing my own turbofilter that handles the {} case... performance is not a major concern here as the consequences of not duplicate filtering is more far more significant than rendering log messages twice. Best regards, Sam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. Patrik Nordwall patrik.nordw...@gmail.com writes: On Sun, Jun 21, 2015 at 4:04 PM, Sam Halliday sam.halli...@gmail.com wrote: Everything is DEBUG, and this is akka 2.3.11. ok We log these (and all other things) with Logger(logClass, logSource).debug({}, message.asInstanceOf[AnyRef]) {} is the part that the duplicate filter will use to identify the log message, i.e. all log messages are identical :( This is not something that we have changed. /Patrik On Sunday, 21 June 2015 15:03:48 UTC+1, Patrik Nordwall wrote: What akka.loglevel are you using? I guess that you see the changed behavior in 2.4-M1 compared to 2.3.11. In 2.4 we have added a check in LoggingReceive that it will only be active if the loglevel is DEBUG. if (context.system.eventStream.logLevel = Logging.DebugLevel) { /Patrik On Thu, Jun 18, 2015 at 11:01 PM, Sam Halliday sam.ha...@gmail.com wrote: Hi all, I'm seeing something very weird. When I enable the logback turboFilter DuplicateMessageFilter: http://logback.qos.ch/manual/filters.html#DuplicateMessageFilter https://github.com/qos-ch/logback/blob/master/logback-classic/src/main/java/ch/qos/logback/classic/turbo/DuplicateMessageFilter.java all my LoggingReceive messages go silent. Uncommenting the one line in my config that enables the filter is the only change and introduces the regression. Latest scala, latest akka, latest logback. I can workaround it at the moment, but this is extremely concerning because I have used the duplicate filter in chatty production systems in the past and to see an incompatibility between two stable libraries is never good. Turning on logback debug (when it reads its own config) is looking good, no problems reported. Best regards, Sam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options
Re: [akka-user] logback turboFilter and akka LoggingReceive incompatible?
Doing the filtering pre-substitution seems like a bug. -- Cheers, √ On 22 Jun 2015 01:54, Sam Halliday sam.halli...@gmail.com wrote: Patrik, Thanks for investigating! You saved me a few hours off my Monday as I was going to go through this in detail and put together a minimal test case :-) Unfortunately, your conclusion seems to be pretty damning. It might just be this one turbofilter that is incompatible with akka, but its likely there are more. Can you think of any workarounds, other that doing duplicate filtering in the application tier? (eek!) I could investigate writing my own turbofilter that handles the {} case... performance is not a major concern here as the consequences of not duplicate filtering is more far more significant than rendering log messages twice. Best regards, Sam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. Patrik Nordwall patrik.nordw...@gmail.com writes: On Sun, Jun 21, 2015 at 4:04 PM, Sam Halliday sam.halli...@gmail.com wrote: Everything is DEBUG, and this is akka 2.3.11. ok We log these (and all other things) with Logger(logClass, logSource).debug({}, message.asInstanceOf[AnyRef]) {} is the part that the duplicate filter will use to identify the log message, i.e. all log messages are identical :( This is not something that we have changed. /Patrik On Sunday, 21 June 2015 15:03:48 UTC+1, Patrik Nordwall wrote: What akka.loglevel are you using? I guess that you see the changed behavior in 2.4-M1 compared to 2.3.11. In 2.4 we have added a check in LoggingReceive that it will only be active if the loglevel is DEBUG. if (context.system.eventStream.logLevel = Logging.DebugLevel) { /Patrik On Thu, Jun 18, 2015 at 11:01 PM, Sam Halliday sam.ha...@gmail.com wrote: Hi all, I'm seeing something very weird. When I enable the logback turboFilter DuplicateMessageFilter: http://logback.qos.ch/manual/filters.html#DuplicateMessageFilter https://github.com/qos-ch/logback/blob/master/logback-classic/src/main/java/ch/qos/logback/classic/turbo/DuplicateMessageFilter.java all my LoggingReceive messages go silent. Uncommenting the one line in my config that enables the filter is the only change and introduces the regression. Latest scala, latest akka, latest logback. I can workaround it at the moment, but this is extremely concerning because I have used the duplicate filter in chatty production systems in the past and to see an incompatibility between two stable libraries is never good. Turning on logback debug (when it reads its own config) is looking good, no problems reported. Best regards, Sam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed
Re: [akka-user] Re: Akka and Mesos: Will it blend?
Akka 2.4 solves the docker problem with supporting an external address and one internal. -- Cheers, √ On 18 Jun 2015 19:28, Jason Martens m...@jasonmartens.com wrote: I have heard from another co-worker that Akka remoting is very difficult on Mesos using Docker, because of the random ports that it uses to connect. Docker expects you to specify which ports are going to be used by your container, and if you don't know ahead of time it can be difficult to support. Keep that in mind when considering deploying using a container. Jason On Wednesday, June 17, 2015 at 12:36:52 PM UTC-7, Brian Topping wrote: Hi all, Just curious if anyone on the list has tried to blend Akka and Mesos at a level that is more granular than the JVM. For those that have heard about it but don't know much about it's core, Mesos is using kernel cgroups to provide ultra light weight virtualization with very low overhead for startup and shutdown as well as being able to provide resource constraints to requesting entities. In one view, it might not make sense to go more granular than the JVM. After all, Mesos can't run JVM classes without one. On the other hand, I'm thinking it might be very useful to have a standardized Akka container that would allow actor distribution with consistent stream-oriented APIs that took care of all the clustering concerns with robust, well-established techniques. There's certainly a lot that can be done here, for instance with warm nodes that are pooled for usage spikes and could reduce first transaction latency even further with no change to semantics. WDYT? Brian -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Streams: How do I know when a flow is finished?
Hi Eric, You'll need to instruct the connect of the Sink to keep its materialized value rather than the Flows: val src = Source(immutable.Seq(1,2,3)) val flo = Flow[Int].map(_ * 2) val sin = Sink.foreach(println) val runFlow = (src via flo).toMat(sin)(Keep.right) val fut = runFlow.run() fut.onComplete(_ = sys.shutdown()) On Thu, Jun 11, 2015 at 6:26 PM, Eric Kolotyluk eric.koloty...@gmail.com wrote: I have some simple code logger.info( Hello World! ) implicit val system = ActorSystem(System) import system.dispatcher logger.info(Create a Source based on a simple Iterable[T]) val source = Source(1 to 10) logger.info(Create sink1 that can be connected to the Source ) val sink1 = Sink.foreach { int: Int = logger.info(sink1: + int) } logger.info(Connect the Source to the sink1, obtaining a runnableFlow1 ) val runnableFlow1: RunnableFlow[Unit] = source.to(sink1) logger.info(Create flowMaterializer1) val flowMaterializer1 = ActorFlowMaterializer() logger.info(Materialize runnableFlow1 as materializedFlow1) val materializedFlow1 = runnableFlow1.run()(flowMaterializer1) // How do I know when materializedFlow1 is finished so I can shut down the actor system? Cheers, Eric -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Streams: How do I know when a flow is finished?
I'd love to see some improvment in the handling of that. On Thu, Jun 11, 2015 at 6:40 PM, Viktor Klang viktor.kl...@gmail.com wrote: Hi Eric, You'll need to instruct the connect of the Sink to keep its materialized value rather than the Flows: val src = Source(immutable.Seq(1,2,3)) val flo = Flow[Int].map(_ * 2) val sin = Sink.foreach(println) val runFlow = (src via flo).toMat(sin)(Keep.right) val fut = runFlow.run() fut.onComplete(_ = sys.shutdown()) On Thu, Jun 11, 2015 at 6:26 PM, Eric Kolotyluk eric.koloty...@gmail.com wrote: I have some simple code logger.info( Hello World! ) implicit val system = ActorSystem(System) import system.dispatcher logger.info(Create a Source based on a simple Iterable[T]) val source = Source(1 to 10) logger.info(Create sink1 that can be connected to the Source ) val sink1 = Sink.foreach { int: Int = logger.info(sink1: + int) } logger.info(Connect the Source to the sink1, obtaining a runnableFlow1) val runnableFlow1: RunnableFlow[Unit] = source.to(sink1) logger.info(Create flowMaterializer1) val flowMaterializer1 = ActorFlowMaterializer() logger.info(Materialize runnableFlow1 as materializedFlow1) val materializedFlow1 = runnableFlow1.run()(flowMaterializer1) // How do I know when materializedFlow1 is finished so I can shut down the actor system? Cheers, Eric -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Streams: How do I know when a flow is finished?
On 16 Jun 2015 11:53, Endre Varga endre.va...@typesafe.com wrote: On Tue, Jun 16, 2015 at 11:50 AM, Viktor Klang viktor.kl...@gmail.com wrote: On 16 Jun 2015 11:18, Endre Varga endre.va...@typesafe.com wrote: On Tue, Jun 16, 2015 at 11:15 AM, Viktor Klang viktor.kl...@gmail.com wrote: On 16 Jun 2015 10:26, Endre Varga endre.va...@typesafe.com wrote: On Tue, Jun 16, 2015 at 10:14 AM, Viktor Klang viktor.kl...@gmail.com wrote: Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest. Well, but then that can be confused with combining elements. Currently, while Mat does not sound nice, at least it makes it clear that there is something you need to understand, while combine will make it easier to misunderstand the purpose. Trust me, the current name is highly confusing too. :) I am yet to see a naming proposal that is not confusing or horrible in different ways :) One alternative is to have 'to' always require a function to combine. That is very cumbersome. Try to rewrite the code examples in the documentation and see how it looks like. Makes it apparent that the user needs to care about it. Or make that function an implicit? Implicits won't work well, because there are different types involved. Again, I propose to try it out first on the documentation examples, or our test suite. I am currently testing different approaches, will report back! -Endre The usage of it is explained with various examples here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Stream_Materialization In general, I still maintain that the source of confusion is not the name itself, but the feature, and the whole concept of the lifted representation. I realize I am not the typical user, but I find the name confusing, especially given that the 'to' method is only toMat but with a baked in Keep.left. Smells hacky from a naming pov. What is your proposal then? See above If the DSL would not be lifted but eager, these confusions would go away -- at the cost of not being able to introspect stream layouts anymore. Yup, fortunately this is not something that needs any changes :-) -Endre -- Cheers, √ On 16 Jun 2015 10:00, Endre Varga endre.va...@typesafe.com wrote: On Fri, Jun 12, 2015 at 10:55 PM, Viktor Klang viktor.kl...@gmail.com wrote: On Fri, Jun 12, 2015 at 3:40 PM, Eric Kolotyluk eric.koloty...@gmail.com wrote: Thanks; so my new code is logger.info(Create a Source based on a simple Iterable[T]) val source = Source(1 to 10) logger.info(create redundant flow1 because of Akka Streams API design limitations) val flow1 = Flow[Int].map(int = int) logger.info(Create sink1 that can be connected to the Source ) //val sink1 = Sink.foreach { int: Int = logger.info(sink1: + int) } val sink1 = Sink.foreach{ int: Int = logger.info(sink1: + int) } logger.info(Connect the Source to the sink1, obtaining a runnableFlow1) //val runnableFlow1: RunnableFlow[Unit] = source.to(sink1) val runnableFlow1 = (source via flow1).toMat(sink1)(Keep.right) logger.info(Create flowMaterializer1) val flowMaterializer1 = ActorFlowMaterializer() logger.info(Materialize runnableFlow1 as materializedFlow1) val materializedFlow1 = runnableFlow1.run()(flowMaterializer1) materializedFlow1.onComplete(result = system.shutdown()) I wish this did not have to be so complicated. While I understand this is still experimental, I hope some effort goes into simplifying the API design. In particular, in my original code, I don't understand why materializedFlow1 is Unit and not Future[Unit]. run() returns Unit because that is the MaterializedType of `flow1`, since Akka Streams preserve the leftmost MaterializedType when chaining operations (source via flow1 to sink1) in this case, if you wnat to override that, you'll need to be explicit about it, hence the toMat (terribly named method, I agree). Keep is a set of functions that is nice to reuse for clarity, you could also write `toMat(sink1)((a,b) = b)` Also, why do explicitly need to create the Flow, why can't the FlowMaterializer do that implicitly if it needs to? I have no idea what you mean here. If you want to run it immediately you can use `runWith(sink)`. Sadly Keep is not defined in http://doc.akka.io/api/akka-stream-and-http-experimental/0.10/#akka.stream.scaladsl.package so there is no way to know what it does. In your IDE, you can go to definition if you are unsure. Otherwise The ScalaDoc for `toMat` should be clear. Also, is there some overriding reason that the method .toMat() cannot simply be called .toMaterializer()? Because it doesn't create a materializer. But OTOH I agree that it is terribly named. I'd vote for fixing
Re: [akka-user] When deployed my system hangs when creating ActorSystem
What does JConsole say the threads are stuck on? On Fri, Jun 12, 2015 at 4:04 AM, Dennis Jönsson zeleb...@gmail.com wrote: Hello, I have been developing a cluster solution in akka for awhile. It is working fine together with the old code in my development environment. But now when its time to deploy the solution and test it live I run into a strange problem. The program simply stops doing anything when doing ActorSystem.create here, the last log is the one right above, nothing happens after that system seems to hang. private void startAkkaClusterMember(boolean master, int port) { Config conf = null; log.debug(Creating config for actor system); if (master) { conf = ConfigFactory.parseString(akka.cluster.roles=[master]).withFallback(ConfigFactory.parseString(akka.remote.netty.tcp.port= + port)) .withFallback(ConfigFactory.load(application)); } else { conf = ConfigFactory.parseString(akka.cluster.roles=[worker]).withFallback(ConfigFactory.parseString(akka.remote.netty.tcp.port= + port)) .withFallback(ConfigFactory.load(application)); } if (master) { log.debug(Trying to create ActorSystem); - system = ActorSystem.create(ClusterSystem, conf); log.debug(System created + system.name()); this.actor = system.actorOf(Props.create(ClusterMemberActor.class), master); system.actorOf( ClusterSingletonManager.defaultProps(Props.create(ActiveMasterActor.class, this.actor), master, PoisonPill.getInstance(), master), active); } else { system = ActorSystem.create(ClusterSystem, conf); this.actor = system.actorOf(Props.create(ClusterMemberActor.class), worker); } log.debug(Done creating actor system + this.actor); } This is my config file, I have confirmed that it gets read properly by logging the contents akka { loggers = [akka.event.slf4j.Slf4jLogger] loglevel = DEBUG logging-filter = akka.event.slf4j.Slf4jLoggingFilter actor { provider = akka.cluster.ClusterActorRefProvider } remote { log-remote-lifecycle-events = off netty.tcp { hostname = 127.0.0.1 port = 0 } } cluster { seed-nodes = [ akka.tcp://ClusterSystem@127.0.0.1:2551, akka.tcp://ClusterSystem@127.0.0.1:2552] auto-down-unreachable-after = 5s failure-detector { threshold = 8.0 min-std-deviation = 100 ms acceptable-heartbeat-pause = 3 s } } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Streams: How do I know when a flow is finished?
On Fri, Jun 12, 2015 at 3:40 PM, Eric Kolotyluk eric.koloty...@gmail.com wrote: Thanks; so my new code is logger.info(Create a Source based on a simple Iterable[T]) val source = Source(1 to 10) logger.info(create redundant flow1 because of Akka Streams API design limitations) val flow1 = Flow[Int].map(int = int) logger.info(Create sink1 that can be connected to the Source ) //val sink1 = Sink.foreach { int: Int = logger.info(sink1: + int) } val sink1 = Sink.foreach{ int: Int = logger.info(sink1: + int) } logger.info(Connect the Source to the sink1, obtaining a runnableFlow1 ) //val runnableFlow1: RunnableFlow[Unit] = source.to(sink1) val runnableFlow1 = (source via flow1).toMat(sink1)(Keep.right) logger.info(Create flowMaterializer1) val flowMaterializer1 = ActorFlowMaterializer() logger.info(Materialize runnableFlow1 as materializedFlow1) val materializedFlow1 = runnableFlow1.run()(flowMaterializer1) materializedFlow1.onComplete(result = system.shutdown()) I wish this did not have to be so complicated. While I understand this is still experimental, I hope some effort goes into simplifying the API design. In particular, in my original code, I don't understand why materializedFlow1 is Unit and not Future[Unit]. run() returns Unit because that is the MaterializedType of `flow1`, since Akka Streams preserve the leftmost MaterializedType when chaining operations (source via flow1 to sink1) in this case, if you wnat to override that, you'll need to be explicit about it, hence the toMat (terribly named method, I agree). Keep is a set of functions that is nice to reuse for clarity, you could also write `toMat(sink1)((a,b) = b)` Also, why do explicitly need to create the Flow, why can't the FlowMaterializer do that implicitly if it needs to? I have no idea what you mean here. If you want to run it immediately you can use `runWith(sink)`. Sadly Keep is not defined in http://doc.akka.io/api/akka-stream-and-http-experimental/0.10/#akka.stream.scaladsl.package so there is no way to know what it does. In your IDE, you can go to definition if you are unsure. Otherwise The ScalaDoc for `toMat` should be clear. Also, is there some overriding reason that the method .toMat() cannot simply be called .toMaterializer()? Because it doesn't create a materializer. But OTOH I agree that it is terribly named. I'd vote for fixing it. Perhaps by defining `to` as: def to[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2], combine: (Mat, Mat2) ⇒ Mat3 = Keep.left): Sink[In, Mat3] - Eric On Thursday, 11 June 2015 15:40:48 UTC-7, √ wrote: Hi Eric, You'll need to instruct the connect of the Sink to keep its materialized value rather than the Flows: val src = Source(immutable.Seq(1,2,3)) val flo = Flow[Int].map(_ * 2) val sin = Sink.foreach(println) val runFlow = (src via flo).toMat(sin)(Keep.right) val fut = runFlow.run() fut.onComplete(_ = sys.shutdown()) On Thu, Jun 11, 2015 at 6:26 PM, Eric Kolotyluk eric.ko...@gmail.com wrote: I have some simple code logger.info( Hello World! ) implicit val system = ActorSystem(System) import system.dispatcher logger.info(Create a Source based on a simple Iterable[T]) val source = Source(1 to 10) logger.info(Create sink1 that can be connected to the Source ) val sink1 = Sink.foreach { int: Int = logger.info(sink1: + int) } logger.info(Connect the Source to the sink1, obtaining a runnableFlow1) val runnableFlow1: RunnableFlow[Unit] = source.to(sink1) logger.info(Create flowMaterializer1) val flowMaterializer1 = ActorFlowMaterializer() logger.info(Materialize runnableFlow1 as materializedFlow1) val materializedFlow1 = runnableFlow1.run()(flowMaterializer1) // How do I know when materializedFlow1 is finished so I can shut down the actor system? Cheers, Eric -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group
Re: [akka-user] When deployed my system hangs when creating ActorSystem
I suspect you'll have to use the same java command as Eclipse generates when it starts. (and see what is different) On Fri, Jun 12, 2015 at 9:07 AM, Dennis Jönsson zeleb...@gmail.com wrote: Yeah I have tried it on several different computers with different OS, its the same thing. And this works just fine when run from eclipse but deployed it does not work and I get zero indication on why. Den fredag 12 juni 2015 kl. 14:55:54 UTC+2 skrev √: I'd test your program on another computer. On Fri, Jun 12, 2015 at 8:51 AM, Dennis Jönsson zele...@gmail.com wrote: I cant get jconsole to work. I run jconsole PID but it just tells me its an invalid PID. Den fredag 12 juni 2015 kl. 14:24:11 UTC+2 skrev √: What does JConsole say the threads are stuck on? On Fri, Jun 12, 2015 at 4:04 AM, Dennis Jönsson zele...@gmail.com wrote: Hello, I have been developing a cluster solution in akka for awhile. It is working fine together with the old code in my development environment. But now when its time to deploy the solution and test it live I run into a strange problem. The program simply stops doing anything when doing ActorSystem.create here, the last log is the one right above, nothing happens after that system seems to hang. private void startAkkaClusterMember(boolean master, int port) { Config conf = null; log.debug(Creating config for actor system); if (master) { conf = ConfigFactory.parseString(akka.cluster.roles=[master]).withFallback(ConfigFactory.parseString(akka.remote.netty.tcp.port= + port)) .withFallback(ConfigFactory.load(application)); } else { conf = ConfigFactory.parseString(akka.cluster.roles=[worker]).withFallback(ConfigFactory.parseString(akka.remote.netty.tcp.port= + port)) .withFallback(ConfigFactory.load(application)); } if (master) { log.debug(Trying to create ActorSystem); - system = ActorSystem.create(ClusterSystem, conf); log.debug(System created + system.name()); this.actor = system.actorOf(Props.create(ClusterMemberActor.class), master); system.actorOf( ClusterSingletonManager.defaultProps(Props.create(ActiveMasterActor.class, this.actor), master, PoisonPill.getInstance(), master), active); } else { system = ActorSystem.create(ClusterSystem, conf); this.actor = system.actorOf(Props.create(ClusterMemberActor.class), worker); } log.debug(Done creating actor system + this.actor); } This is my config file, I have confirmed that it gets read properly by logging the contents akka { loggers = [akka.event.slf4j.Slf4jLogger] loglevel = DEBUG logging-filter = akka.event.slf4j.Slf4jLoggingFilter actor { provider = akka.cluster.ClusterActorRefProvider } remote { log-remote-lifecycle-events = off netty.tcp { hostname = 127.0.0.1 port = 0 } } cluster { seed-nodes = [ akka.tcp://ClusterSystem@127.0.0.1:2551, akka.tcp://ClusterSystem@127.0.0.1:2552] auto-down-unreachable-after = 5s failure-detector { threshold = 8.0 min-std-deviation = 100 ms acceptable-heartbeat-pause = 3 s } } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to
Re: [akka-user] Error from non-actor execution context disappears
Hi Jonas, Alla is not involved in that execution path. Use Akka Dispatcher for the EC to the future iso the global pool. -- Cheers, √ On 15 Jun 2015 16:55, Jonas K thejo...@gmail.com wrote: I have an actor that calls code that returns a Future using an ExecutionContext different from the one used by the actor, as shown below. class BadActor extends Actor with ActorLogging { def receive = { case BadActor.Initialize = log.info(Received Initialize) SomethingAsync.doIt } } object SomethingAsync { import scala.concurrent.ExecutionContext.Implicits.global def doIt: Future[Int] = Future { throw new OutOfMemoryError() 5 } } If the code throws an Error, the stacktrace is printed out, but the jvm does not exit, though akka.jvm-exit-on-fatal-error = on. If I pass in the Actor's dispatcher as an ExecutionContext to the failing code, the jvm exits as desired. Should I always use the Actor's ExecutionContext in any code that it calls? How do I handle 3rd party libraries that manage their own ExecutionContexts internally (eg. Slick)? Thanks for any advice, _jonas -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Streams: How do I know when a flow is finished?
On 16 Jun 2015 10:26, Endre Varga endre.va...@typesafe.com wrote: On Tue, Jun 16, 2015 at 10:14 AM, Viktor Klang viktor.kl...@gmail.com wrote: Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest. Well, but then that can be confused with combining elements. Currently, while Mat does not sound nice, at least it makes it clear that there is something you need to understand, while combine will make it easier to misunderstand the purpose. Trust me, the current name is highly confusing too. :) The usage of it is explained with various examples here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Stream_Materialization In general, I still maintain that the source of confusion is not the name itself, but the feature, and the whole concept of the lifted representation. I realize I am not the typical user, but I find the name confusing, especially given that the 'to' method is only toMat but with a baked in Keep.left. Smells hacky from a naming pov. If the DSL would not be lifted but eager, these confusions would go away -- at the cost of not being able to introspect stream layouts anymore. Yup, fortunately this is not something that needs any changes :-) -Endre -- Cheers, √ On 16 Jun 2015 10:00, Endre Varga endre.va...@typesafe.com wrote: On Fri, Jun 12, 2015 at 10:55 PM, Viktor Klang viktor.kl...@gmail.com wrote: On Fri, Jun 12, 2015 at 3:40 PM, Eric Kolotyluk eric.koloty...@gmail.com wrote: Thanks; so my new code is logger.info(Create a Source based on a simple Iterable[T]) val source = Source(1 to 10) logger.info(create redundant flow1 because of Akka Streams API design limitations) val flow1 = Flow[Int].map(int = int) logger.info(Create sink1 that can be connected to the Source ) //val sink1 = Sink.foreach { int: Int = logger.info(sink1: + int) } val sink1 = Sink.foreach{ int: Int = logger.info(sink1: + int) } logger.info(Connect the Source to the sink1, obtaining a runnableFlow1) //val runnableFlow1: RunnableFlow[Unit] = source.to(sink1) val runnableFlow1 = (source via flow1).toMat(sink1)(Keep.right) logger.info(Create flowMaterializer1) val flowMaterializer1 = ActorFlowMaterializer() logger.info(Materialize runnableFlow1 as materializedFlow1) val materializedFlow1 = runnableFlow1.run()(flowMaterializer1) materializedFlow1.onComplete(result = system.shutdown()) I wish this did not have to be so complicated. While I understand this is still experimental, I hope some effort goes into simplifying the API design. In particular, in my original code, I don't understand why materializedFlow1 is Unit and not Future[Unit]. run() returns Unit because that is the MaterializedType of `flow1`, since Akka Streams preserve the leftmost MaterializedType when chaining operations (source via flow1 to sink1) in this case, if you wnat to override that, you'll need to be explicit about it, hence the toMat (terribly named method, I agree). Keep is a set of functions that is nice to reuse for clarity, you could also write `toMat(sink1)((a,b) = b)` Also, why do explicitly need to create the Flow, why can't the FlowMaterializer do that implicitly if it needs to? I have no idea what you mean here. If you want to run it immediately you can use `runWith(sink)`. Sadly Keep is not defined in http://doc.akka.io/api/akka-stream-and-http-experimental/0.10/#akka.stream.scaladsl.package so there is no way to know what it does. In your IDE, you can go to definition if you are unsure. Otherwise The ScalaDoc for `toMat` should be clear. Also, is there some overriding reason that the method .toMat() cannot simply be called .toMaterializer()? Because it doesn't create a materializer. But OTOH I agree that it is terribly named. I'd vote for fixing it. Perhaps by defining `to` as: def to[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2], combine: (Mat, Mat2) ⇒ Mat3 = Keep.left): Sink[In, Mat3] This is how it looked like originally. Unfortunately, this way you lose type inference of the function argumetns for combine -- hence we changed to curried form. Unfortunately that means that you can no longer override a non-curried version with a curried one, so you need to introduce a new name to the combine variant. You can name that toAndCombineMaterializedValue but honestly, I don't think that helps at all with understanding. It is not the name that causes the confusion but the concept of materialized values, which cannot be dropped if we want to have the lifted representation that we have now. -Endre - Eric On Thursday, 11 June 2015 15:40:48 UTC-7, √ wrote: Hi Eric, You'll need to instruct the connect of the Sink to keep its materialized value rather than the Flows: val src = Source(immutable.Seq(1,2,3)) val flo = Flow[Int].map(_ * 2) val sin = Sink.foreach(println) val runFlow = (src via flo).toMat(sin)(Keep.right) val fut = runFlow.run
Re: [akka-user] Akka Streams: How do I know when a flow is finished?
Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest. -- Cheers, √ On 16 Jun 2015 10:00, Endre Varga endre.va...@typesafe.com wrote: On Fri, Jun 12, 2015 at 10:55 PM, Viktor Klang viktor.kl...@gmail.com wrote: On Fri, Jun 12, 2015 at 3:40 PM, Eric Kolotyluk eric.koloty...@gmail.com wrote: Thanks; so my new code is logger.info(Create a Source based on a simple Iterable[T]) val source = Source(1 to 10) logger.info(create redundant flow1 because of Akka Streams API design limitations) val flow1 = Flow[Int].map(int = int) logger.info(Create sink1 that can be connected to the Source ) //val sink1 = Sink.foreach { int: Int = logger.info(sink1: + int) } val sink1 = Sink.foreach{ int: Int = logger.info(sink1: + int) } logger.info(Connect the Source to the sink1, obtaining a runnableFlow1) //val runnableFlow1: RunnableFlow[Unit] = source.to(sink1) val runnableFlow1 = (source via flow1).toMat(sink1)(Keep.right) logger.info(Create flowMaterializer1) val flowMaterializer1 = ActorFlowMaterializer() logger.info(Materialize runnableFlow1 as materializedFlow1) val materializedFlow1 = runnableFlow1.run()(flowMaterializer1) materializedFlow1.onComplete(result = system.shutdown()) I wish this did not have to be so complicated. While I understand this is still experimental, I hope some effort goes into simplifying the API design. In particular, in my original code, I don't understand why materializedFlow1 is Unit and not Future[Unit]. run() returns Unit because that is the MaterializedType of `flow1`, since Akka Streams preserve the leftmost MaterializedType when chaining operations (source via flow1 to sink1) in this case, if you wnat to override that, you'll need to be explicit about it, hence the toMat (terribly named method, I agree). Keep is a set of functions that is nice to reuse for clarity, you could also write `toMat(sink1)((a,b) = b)` Also, why do explicitly need to create the Flow, why can't the FlowMaterializer do that implicitly if it needs to? I have no idea what you mean here. If you want to run it immediately you can use `runWith(sink)`. Sadly Keep is not defined in http://doc.akka.io/api/akka-stream-and-http-experimental/0.10/#akka.stream.scaladsl.package so there is no way to know what it does. In your IDE, you can go to definition if you are unsure. Otherwise The ScalaDoc for `toMat` should be clear. Also, is there some overriding reason that the method .toMat() cannot simply be called .toMaterializer()? Because it doesn't create a materializer. But OTOH I agree that it is terribly named. I'd vote for fixing it. Perhaps by defining `to` as: def to[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2], combine: (Mat, Mat2) ⇒ Mat3 = Keep.left): Sink[In, Mat3] This is how it looked like originally. Unfortunately, this way you lose type inference of the function argumetns for combine -- hence we changed to curried form. Unfortunately that means that you can no longer override a non-curried version with a curried one, so you need to introduce a new name to the combine variant. You can name that toAndCombineMaterializedValue but honestly, I don't think that helps at all with understanding. It is not the name that causes the confusion but the concept of materialized values, which cannot be dropped if we want to have the lifted representation that we have now. -Endre - Eric On Thursday, 11 June 2015 15:40:48 UTC-7, √ wrote: Hi Eric, You'll need to instruct the connect of the Sink to keep its materialized value rather than the Flows: val src = Source(immutable.Seq(1,2,3)) val flo = Flow[Int].map(_ * 2) val sin = Sink.foreach(println) val runFlow = (src via flo).toMat(sin)(Keep.right) val fut = runFlow.run() fut.onComplete(_ = sys.shutdown()) On Thu, Jun 11, 2015 at 6:26 PM, Eric Kolotyluk eric.ko...@gmail.com wrote: I have some simple code logger.info( Hello World! ) implicit val system = ActorSystem(System) import system.dispatcher logger.info(Create a Source based on a simple Iterable[T]) val source = Source(1 to 10) logger.info(Create sink1 that can be connected to the Source ) val sink1 = Sink.foreach { int: Int = logger.info(sink1: + int) } logger.info(Connect the Source to the sink1, obtaining a runnableFlow1) val runnableFlow1: RunnableFlow[Unit] = source.to(sink1) logger.info(Create flowMaterializer1) val flowMaterializer1 = ActorFlowMaterializer() logger.info(Materialize runnableFlow1 as materializedFlow1) val materializedFlow1 = runnableFlow1.run()(flowMaterializer1) // How do I know when materializedFlow1 is finished so I can shut down the actor system? Cheers, Eric -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received
Re: [akka-user] Akka Streams: How do I know when a flow is finished?
On 16 Jun 2015 11:18, Endre Varga endre.va...@typesafe.com wrote: On Tue, Jun 16, 2015 at 11:15 AM, Viktor Klang viktor.kl...@gmail.com wrote: On 16 Jun 2015 10:26, Endre Varga endre.va...@typesafe.com wrote: On Tue, Jun 16, 2015 at 10:14 AM, Viktor Klang viktor.kl...@gmail.com wrote: Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest. Well, but then that can be confused with combining elements. Currently, while Mat does not sound nice, at least it makes it clear that there is something you need to understand, while combine will make it easier to misunderstand the purpose. Trust me, the current name is highly confusing too. :) I am yet to see a naming proposal that is not confusing or horrible in different ways :) One alternative is to have 'to' always require a function to combine. Makes it apparent that the user needs to care about it. Or make that function an implicit? The usage of it is explained with various examples here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Stream_Materialization In general, I still maintain that the source of confusion is not the name itself, but the feature, and the whole concept of the lifted representation. I realize I am not the typical user, but I find the name confusing, especially given that the 'to' method is only toMat but with a baked in Keep.left. Smells hacky from a naming pov. What is your proposal then? See above If the DSL would not be lifted but eager, these confusions would go away -- at the cost of not being able to introspect stream layouts anymore. Yup, fortunately this is not something that needs any changes :-) -Endre -- Cheers, √ On 16 Jun 2015 10:00, Endre Varga endre.va...@typesafe.com wrote: On Fri, Jun 12, 2015 at 10:55 PM, Viktor Klang viktor.kl...@gmail.com wrote: On Fri, Jun 12, 2015 at 3:40 PM, Eric Kolotyluk eric.koloty...@gmail.com wrote: Thanks; so my new code is logger.info(Create a Source based on a simple Iterable[T]) val source = Source(1 to 10) logger.info(create redundant flow1 because of Akka Streams API design limitations) val flow1 = Flow[Int].map(int = int) logger.info(Create sink1 that can be connected to the Source ) //val sink1 = Sink.foreach { int: Int = logger.info(sink1: + int) } val sink1 = Sink.foreach{ int: Int = logger.info(sink1: + int) } logger.info(Connect the Source to the sink1, obtaining a runnableFlow1) //val runnableFlow1: RunnableFlow[Unit] = source.to(sink1) val runnableFlow1 = (source via flow1).toMat(sink1)(Keep.right) logger.info(Create flowMaterializer1) val flowMaterializer1 = ActorFlowMaterializer() logger.info(Materialize runnableFlow1 as materializedFlow1) val materializedFlow1 = runnableFlow1.run()(flowMaterializer1) materializedFlow1.onComplete(result = system.shutdown()) I wish this did not have to be so complicated. While I understand this is still experimental, I hope some effort goes into simplifying the API design. In particular, in my original code, I don't understand why materializedFlow1 is Unit and not Future[Unit]. run() returns Unit because that is the MaterializedType of `flow1`, since Akka Streams preserve the leftmost MaterializedType when chaining operations (source via flow1 to sink1) in this case, if you wnat to override that, you'll need to be explicit about it, hence the toMat (terribly named method, I agree). Keep is a set of functions that is nice to reuse for clarity, you could also write `toMat(sink1)((a,b) = b)` Also, why do explicitly need to create the Flow, why can't the FlowMaterializer do that implicitly if it needs to? I have no idea what you mean here. If you want to run it immediately you can use `runWith(sink)`. Sadly Keep is not defined in http://doc.akka.io/api/akka-stream-and-http-experimental/0.10/#akka.stream.scaladsl.package so there is no way to know what it does. In your IDE, you can go to definition if you are unsure. Otherwise The ScalaDoc for `toMat` should be clear. Also, is there some overriding reason that the method .toMat() cannot simply be called .toMaterializer()? Because it doesn't create a materializer. But OTOH I agree that it is terribly named. I'd vote for fixing it. Perhaps by defining `to` as: def to[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2], combine: (Mat, Mat2) ⇒ Mat3 = Keep.left): Sink[In, Mat3] This is how it looked like originally. Unfortunately, this way you lose type inference of the function argumetns for combine -- hence we changed to curried form. Unfortunately that means that you can no longer override a non-curried version with a curried one, so you need to introduce a new name to the combine variant. You can name that toAndCombineMaterializedValue but honestly, I don't think that helps at all with understanding
Re: [akka-user] Akka http load balancer
On Wed, May 27, 2015 at 2:01 PM, Endre Varga endre.va...@typesafe.com wrote: Hi, Instead of Http.request, you should use the Flow returned by Http.superPool() (see http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/http/client-side/request-level.html). That flattens out the Futures and you get responses instead. That also makes Balance actually aware of response times. OTOH I don't think performance wise akka-http is currently up to the task of being a balancer. …at the moment. -Endre On Wed, May 27, 2015 at 1:21 PM, zergood zergoodso...@gmail.com wrote: Hello! I have a task to develop a http balance loader for my 2 servers. Here is my qucik and very dirty implementation https://gist.github.com/zergood/e705cd6ce4cfec47c0a5. The main problem with it is performance, this solution is slower than my single server. What is the reason of performance degradation? Could you give me any advices how to make http load balancer with akka-http? I am using scala-2.11 and akka-http 1.0-RC3. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka http load balancer
On Wed, May 27, 2015 at 2:33 PM, Viktor Klang viktor.kl...@gmail.com wrote: On Wed, May 27, 2015 at 2:01 PM, Endre Varga endre.va...@typesafe.com wrote: Hi, Instead of Http.request, you should use the Flow returned by Http.superPool() (see http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/http/client-side/request-level.html). That flattens out the Futures and you get responses instead. That also makes Balance actually aware of response times. OTOH I don't think performance wise akka-http is currently up to the task of being a balancer. …at the moment. Doh, that currently eluded me, see my response as emphasis :) -Endre On Wed, May 27, 2015 at 1:21 PM, zergood zergoodso...@gmail.com wrote: Hello! I have a task to develop a http balance loader for my 2 servers. Here is my qucik and very dirty implementation https://gist.github.com/zergood/e705cd6ce4cfec47c0a5. The main problem with it is performance, this solution is slower than my single server. What is the reason of performance degradation? Could you give me any advices how to make http load balancer with akka-http? I am using scala-2.11 and akka-http 1.0-RC3. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] question on getting mailbox confirmation
Hi Shawn, On Mon, May 25, 2015 at 4:29 PM, Shawn Garner shawndgar...@gmail.com wrote: The arbiter sounds like what we would need. Are there any activator templates or example of someone using an arbiter in Akka you can point me to? This way I could demo it to my coworker without having to write something from scratch? You could most probably illustrate it on a whiteboard in a simple step-by-step way. But AFAIK there's no pre-cooked version. A lot of the problems at my company are cultural rather than matching problems to solutions like you said. We have a lot of shared projects which are basically canned solutions to standard problems and are basically required to use. If a solution doesn't look exactly the same and fit into the same paradigm then it is not viewed as viable. It's viewed as resolving a problem that has already been solved. I understand, and have had similar situations in the past. The question that usually helps here is why change is being considered if the existing solutions work so well? So we're talking about an aging set of canned solutions which are getting to be about 10 years old without anyone considering anything else along the way. The people who created those are still at the company and very stuck on them. Part of it is rampant sunken cost fallacy. Oh, I hear you. Territorialism can be extremely destructive and a source of much inertia. Something that always resonated with me is You have to pick people up where they are and not where you want them to be., is there a common ground/understanding/position that you can build from? We have a lot of batch import/export jobs which instead of run daily could be modeled as reacting to events in runtime with Akka. (I've been thinking/searching about how to get events into Akka when certain tables change or get data inserted into them without writing code to detect such events. aka Event sourcing from SQL Server database) We are having some growing pains where our batch jobs are taking too long. We have admin pages which don't update themselves because they need to wait on large batch imports in a single transaction and dirty reads are not allowed in our databases at the db server level. Check. That's where the illusion of a strongly consistent reality starts to break down. Seems like a good time to take a step back and focus on the real problem rather than the symptom (the mess in the DB). So in my mind Akka is a viable solution (and a much preferable one) to a problem over something like Spring Batch and Spring Integration. Those Spring things are also mostly canned solutions which look like what our current shared solutions expect so they would always be preferred over Akka. Absolutely. Is this a use-case for Akka Streams perhaps? Thanks, Shawn On Monday, May 25, 2015 at 4:53:58 AM UTC-5, √ wrote: Hi Shawn, The use case you're referring to (transitive ordering) I think traditionally is accomplished in the actor model with the use of arbiters: https://en.wikipedia.org/wiki/Indeterminacy_in_concurrent_computation I.e. you need a single source of truth w.r.t. the ordering, and since Actors are islands of order in a sea of chaos you just need A and B to agree on an intermediary when communicating with C, as you say, remoting would otherwise be very interesting (and different mailbox implementations may give you different semantics). Burdening all communication with acks (which could get lost as well, and the original message too) just because it is needed in some cases, would not be responsible :) In general, with your coworker, I suspect it is a matter of mapping problems to solutions rather than solutions to solutions (i.e. it may not be meaningful to map all solution in his/her world to the world of actors, but instead focus on how certain problems are solved with actors and then he/she can do the mapping from other-technology-prefered solution to the actor-solution themselves) Does that make sense? On Sat, May 23, 2015 at 4:41 PM, Shawn Garner shawnd...@gmail.com wrote: I was talking with a coworker and he has some custom behavior he can't understand how to do anything useful without. He want's an tell message (a') sent from A - C to not allow actor A to continue until after there is confirmation that A's message is in C's mailbox. This way if a sends (a'') to from A - B and B sends a message (b') fro B - C that C should process messages always in the defined order a',b'. I was looking at the mailboxes and it sees like even if you use the UnboundedPriorityMailbox and it seems to me that with a tell message it probably is not blocking the sending actor but some kind of internal component of Akka. So I'm thinking UnboundedPriorityMailbox breaks down when you start talking remote actors and http actors in that the first one deserialized would block the other one but not block the sender until the message is in the mailbox. Also I'm
Re: [akka-user] Java: Yet another get or create actor question
In the mean time there's getChild + Optional.ofNullable On Tue, Jun 2, 2015 at 11:43 AM, Roland Kuhn goo...@rkuhn.info wrote: Thanks for raising this, we should fix it https://github.com/akka/akka/issues/17635 for Akka 2.4 by offering a method that gives you an Optional instead of a Scala Option. Regards, Roland 1 jun 2015 kl. 02:50 skrev Guido Medina oxyg...@gmail.com: Is there any way to avoid the verbosity?, I try with a more functional approach which was OK for IntelliJ but not for the Java 8 compiler, here is the form that is working: private void applyToAccount(Currency currency, BalanceOperation operation) { context().child(currency.code). getOrElse(new AbstractFunction0OptionActorRef() { @Override public OptionActorRef apply() { return Option.some(context().actorOf(balancePersistorProps( currency), currency.code)); } }). get().forward(operation, context); } But I was hoping the following would work, but Java 8 can't infer the type, strange IntelliJ recognizes it I guess the mix of Java and Scala drives it crazy: private void applyToAccount(Currency currency, BalanceOperation operation) { context().child(currency.code). getOrElse(() - Option.some(context().actorOf(balancePersistorProps( currency), currency.code))). get().forward(operation, context); } Best regards. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. *Dr. Roland Kuhn* *Akka Tech Lead* Typesafe http://typesafe.com/ – Reactive apps on the JVM. twitter: @rolandkuhn http://twitter.com/#!/rolandkuhn -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka stream - download large file in chunks
Hi, This? http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/http/client-side/index.html On Tue, Jun 2, 2015 at 12:01 PM, programad...@gmail.com wrote: Hi, I am trying to download a file from a website using the akka stream, because today I have a problem of heap size. I have been through the documentation and I haven't found any good examples. Anybody has any suggestion or examples how I can do it in a good way? Regards -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka stream - download large file in chunks
Hi, a good start, in my opinion, would be if you could show what you have tried and what is not working. On Tue, Jun 2, 2015 at 1:36 PM, programad...@gmail.com wrote: Thank you for it, but I had already the documentation link, as well as links from stack overflow and other links in github with some examples that don't work anymore.. I really think that all problems can be solved using in the documentation, I would like some specifics links and examples, as I am sure you read all the documentation and understood all you would gladly share more significant information. And if you have any suggestion how I could improve my question, please do. Cheers On Tuesday, June 2, 2015 at 12:33:03 PM UTC+2, √ wrote: Hi, This? http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/http/client-side/index.html On Tue, Jun 2, 2015 at 12:01 PM, progra...@gmail.com wrote: Hi, I am trying to download a file from a website using the akka stream, because today I have a problem of heap size. I have been through the documentation and I haven't found any good examples. Anybody has any suggestion or examples how I can do it in a good way? Regards -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-remote: If the configuration file contains an invalid settings, log message is null.
Hi okyongsu, thanks for the report! It does seem, at a glance, like the proposed fix merely treats the symptom rather than the problem. I wonder if there is a cleaner way to deal with this. On Thu, Jul 2, 2015 at 8:26 AM, okyongsu okyon...@gmail.com wrote: environment: - jdk 1.8.0_40-b26 - scala 2.11.7 - akka-remote 2.3.11 If *akka-remote* configuration contains an *invalid *settings, # setting log-frame-size-exceeding = off# valid log-frame-size-exceeding = 1000b # valid log-frame-size-exceeding = on # invalid (error prone) the following log message occurs. address is now gated for [5000] ms. Reason: [exception during creation] Caused by: [*null*] detail log message: [WARN] [07/02/2015 11:32:24.896] [pulse-service-web-akka.remote.default-remote-dispatcher-5] [akka.tcp:///system/endpointManager/reliableEndpointWriter-akka.tcp] Association with remote system [akka.tcp://] has failed, address is now gated for [5000] ms. Reason: [exception during creation] Caused by: [null] [ERROR] [07/02/2015 11:32:54.908] [pulse-service-web-akka.actor.default-dispatcher-22] [ActorSystem(pulse-service-web)] Error during processing of request HttpRequest(..) akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka.tcp:///), Path(/user/report)]] after [3 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) and debug result. https://lh3.googleusercontent.com/-c9_Svwc4D5M/VZTUWzyd_LI/ELc/83o02GvrNDw/s1600/intellij-debugging.png How to solve it like this? diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index af3ec82..3c747b4 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -28,6 +28,7 @@ import scala.util.control.NonFatal import java.util.concurrent.locks.LockSupport import scala.concurrent.Future import scala.concurrent.blocking +import java.lang.reflect.InvocationTargetException /** * INTERNAL API @@ -210,7 +211,12 @@ private[remote] class ReliableDeliverySupervisor( override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e @ (_: AssociationProblem) ⇒ Escalate case NonFatal(e) ⇒ - val causedBy = if (e.getCause == null) else sCaused by: [${e.getCause.getMessage}] + + val causedBy = Option(e.getCause).withFilter(_ != null).flatMap { +case ec: InvocationTargetException ⇒ Option(ec.getCause).withFilter(_ != null).map(_.getMessage) +case ec⇒ Some(ec.getMessage) + }.fold()(message ⇒ sCaused by: [$message]) + log.warning(Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}, remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy) uidConfirmed = false // Need confirmation of UID again -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at
Re: [akka-user] Akka HTTP Streams vs Pulse Queues
Hi Eric, I'm not sure I can answer your very broad question, would you mind/be able to formulate a couple of more specific ones? On Fri, Jul 3, 2015 at 6:34 PM, Eric Kolotyluk eric.koloty...@gmail.com wrote: I have prototyped an application that basically multiplexes messages, assembles them into blocks, and then segments them. The application tries to do as much concurrently as possible. I have used something I call a 'pulse queue' which is based on a non-blocking concurrent queue. http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html Basically, I create a separate future to handle the processing of each message, and the futures complete their work by adding their results to the non-blocking queue. The pulse part is that periodically (i.e. each second) the queue is drained to create a block or segment. There are other reasons to drain a queue, i.e. size, but primarily it is the pulse that keeps data moving through the system. I have found this to be simple to code, and effective at handling large numbers of Futures concurrently. As I am trying to understand Akka Streams, and Akka HTTP, I am wondering if Streams would be a better or equivalent solution, but I do not really understand how streams work under the hood to answer my curiosity. For example, in the Streams environment is it possible to create a separate future to handle each message, the way the pulse queue does? My sense is that each element in the stream is an Actor under the hood, so that the messages (while non-blocking) would get serialized. Cheers, Eric -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka Stream Source as ActorPublisher
Hi Maxim, could you please minimize the problematic code into a self-contained, no extraneous pieces, snippet? On Mon, Jul 6, 2015 at 9:55 PM, Maxim Korolyov korolyov.ma...@gmail.com wrote: Hey, i was playing with akka stream and akka http and faced and issue, when source created as ActorPublisher doesn't publish any data to source. i have verified that `onNext` method of ActorPublisher is invoking and that the source is empty by adding a logging flow like this source.via(Flow[String]{s = println(s);s} Have any one faced this? there is a link to the code http://github.com/mkorolyov/akka-streams. In two words it is an websocked endpoint which expects in request message json string {isoCode:USD} and serves realtime currency rates updates + optionally history of rates by currency code. I am creating an ` actualSource` from the ActorPublisher and concatenating it with history source from DB in CurrecnyService.scala. Concatenated source is transferred as chucked message to websocket channel. Appreciate for any advise. Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] count, process message in Akka
Hi Samy, What's the use-case? On Mon, Jun 29, 2015 at 8:14 PM, Samy sethu.r...@gmail.com wrote: How to Query (or get all) messages with a specific sender in a custom mailbox, I followed Akka custom mailbox but I dont have any luck -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [Streams] Is ActorPublisher requested asynchronously or sequentially?
Hi Jakub, Starting to read your email I definitely thought there must be something mysterious at work! From what I can tell, there are a couple of compounding things here: 1) future.onComplete will be executed on another thread than the actor, or concurrently with the actor, this means that you can't close over the actor and call methods on it from another thread, see: http://doc.akka.io/docs/akka/2.3.11/additional/faq.html 2) when you call `Await` on the Future, you're only going to await it having a value, not await its callbacks to finish execute. So: 1. val f = someFuture(…) 2. f.onComplete { … } 3. Await.result(f, …) When line 3 executes, onComplete could have already executed, is (con)currently being executed or will be executed. Does that make sense? On Mon, May 25, 2015 at 11:03 AM, Jakub Liska liska.ja...@gmail.com wrote: Hi, in other words : def receive: Receive = { case Request(demand) if totalDemand 0 demand 0 isActive = // can it happen that another Request message comes before this partial function returns (while this one is being processed) ? } I have an asynchronous ActorProvider that is scanning ElasticSearch index, but I'm calling await at the end, so it is basically blocking : private var lastScrollId: String = _ def receive: Receive = { case Request(demand) if totalDemand 0 demand 0 isActive = def pushRecursively(n: Long, scrollId: String): Future[Option[String]] = { require(scrollId != null scrollId.nonEmpty, Scroll id must be present!) scroll(scrollId) flatMap { case (sid, recs) if recs.isEmpty = // empty hits means end of scanning/scrolling Future.successful(Option.empty) case (sid, recs) = onNext(recs) if (n 1) pushRecursively(n-1, sid) else Future.successful(Option(sid)) } } val f = pushRecursively(Math.min(demand, totalDemand), lastScrollId) f onComplete { case Failure(ex) = log.error(ex, Unexpected ScanSource error) onError(ex) context.stop(self) case Success(sidOpt) = sidOpt match { case None = log.info(ScanSource just completed...) if (isCompleted) log.warning(ScanSource already completed, I cannot figure out why this occurs!) else { onComplete() context.stop(self) } case Some(sid) = lastScrollId = sid } } f.await(600.seconds) case Cancel = context.stop(self) } But as you can see, there is log.warning sayig that onComplete() was already called, which can happen only if ActorPublisher wasn't Requested sequentially. I think this implementation is correct and valid even though it is blocking actor's dispatcher thread. But I really cannot figure out how it can be completed twice... -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] question on getting mailbox confirmation
Hi Shawn, The use case you're referring to (transitive ordering) I think traditionally is accomplished in the actor model with the use of arbiters: https://en.wikipedia.org/wiki/Indeterminacy_in_concurrent_computation I.e. you need a single source of truth w.r.t. the ordering, and since Actors are islands of order in a sea of chaos you just need A and B to agree on an intermediary when communicating with C, as you say, remoting would otherwise be very interesting (and different mailbox implementations may give you different semantics). Burdening all communication with acks (which could get lost as well, and the original message too) just because it is needed in some cases, would not be responsible :) In general, with your coworker, I suspect it is a matter of mapping problems to solutions rather than solutions to solutions (i.e. it may not be meaningful to map all solution in his/her world to the world of actors, but instead focus on how certain problems are solved with actors and then he/she can do the mapping from other-technology-prefered solution to the actor-solution themselves) Does that make sense? On Sat, May 23, 2015 at 4:41 PM, Shawn Garner shawndgar...@gmail.com wrote: I was talking with a coworker and he has some custom behavior he can't understand how to do anything useful without. He want's an tell message (a') sent from A - C to not allow actor A to continue until after there is confirmation that A's message is in C's mailbox. This way if a sends (a'') to from A - B and B sends a message (b') fro B - C that C should process messages always in the defined order a',b'. I was looking at the mailboxes and it sees like even if you use the UnboundedPriorityMailbox and it seems to me that with a tell message it probably is not blocking the sending actor but some kind of internal component of Akka. So I'm thinking UnboundedPriorityMailbox breaks down when you start talking remote actors and http actors in that the first one deserialized would block the other one but not block the sender until the message is in the mailbox. Also I'm thinking that UnboundedPriorityMailbox will just order messages sitting in the mailbox. If b' arrives before a', then b' may already be removed from the mailbox and being processed which the blocking and ordering of UnboundedPriorityMailbox will do nothing to help. Could you help me understand this? Is there anything built into Akka would give you this guarantee of order? Or would I have to build this in yourself with some kind of ack system with an ask vs a tell message waiting to send a'' until a' was ack'd as recieved and delegated to another worker actor D? My co-worker seems fixated on these aynch semantics because he want's it to behave like an asynch queue of ActiveMQ and feels Akka should provide something like that out of the box. I told him I don't think so but you get all the tools to build it yourself whatever semantics you want. Thanks, Shawn -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.