[akka-user] Newbie Questions About PersistentView and Populating Read Datastores
I've been reading up here on PersistentActor, and I think I get how that works to perform commands and write the result to an event store. I also think I understand that PersistentViews can subscribe to a PersistentActor and receive notification of each event stored for that PersistentActor type. I want to take a PersistentView and use it to update a separate Read datastore. I don't want to treat the PersistentView itself as a read store, but I want to make it trigger the creation or updating or saving of a projection of the event in some other store, like, e.g., Elasticsearch or Postgresql. Are there any guidelines, best practices, or examples of how to do this? One thread (https://groups.google.com/forum/#!searchin/akka-user/persistentview/akka-user/rMHjwBZpocQ/SmfAGMg7G68J) from June 2014 seemed to indicate that this sort of writing to another store would require PersistentViews to be able to read from multiple PersistentActors for this to be feasible. Is this still true? What is the path to take here, to get the actor system populating my read stores? Are people instead creating projections directly from the event store itself, like Greg Young's EventStore allows? Any insight is welcome! Amiri -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Message Broadcasting in an akka cluster
Hi, Is there a way to broadcast a message across the nodes in a cluster without using an intermediate actor like a mediator actor in distributed pub-sub or a shard region actor in cluster sharding? BR, Vishnu -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Parallel operation on list using future
14 jul 2015 kl. 07:04 skrev Maatary Okouya maatarioko...@gmail.com: Hi, I was just wondering as a general advise if the following made sense. I have a list, that i need to filter according to the following criteria: Let say the list contain things of type A, B, C and D and i want to take n0 elt of A, n1 elt of B, n2 elt of C and n3 elt of D and then make one list out of it. the iterative approach is pretty clean (i.e. going over the all list, using 4 counters, adding elt to each list until each respective counter reached it limit i.e. n1, n2, n3, n4), but a colleague at work told me to take advantage of multiple cpu and parallelize the operation using future. in other words, launching 4 future operation that filter the list, and drop if it applies (i.e.resultinglist nx), resultinglist.size - n0 or n1 or n2 or n3 or n4. then await the result and combine the list. I think this is an overkill for something we use to do iteratively pretty easily. I just wonder what people think about that. Yes i can run a test and compare the speed, but it raise the question of, when exactly can we ensure that we are taking advantage of the multiple cpu architecture. Because indeed i understand the motivation behind the suggestion. However, i did not know how to tell that it might be counter productive. We were both stuck in debate and enable to state if it is good situation or a bad situation to use parallelization. In other word we did not have a criterion. Is testing the only way to know ? One way to look at such problems is to figure out what the costly part is and then ask yourself if that can meaningfully be partitioned (which is the prerequisite for benefiting from parallelism). In this case the expensive part is to traverse the list (touching all those memory locations) and the best you can do is to traverse it once, no partitioning possible. In general parallelism for processing collections is an allure that seldom leads to faster or more efficient programs—the collection would have to be huge and partitionable or the operation to be run needs to be expensive in order to realize gains. Occupying more CPUs is not a good purpose in itself. Regards, Roland Many thanks, M -- Read the docs: http://akka.io/docs/ http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com mailto:akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com mailto:akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout https://groups.google.com/d/optout. Dr. Roland Kuhn Akka Tech Lead Typesafe http://typesafe.com/ – Reactive apps on the JVM. twitter: @rolandkuhn http://twitter.com/#!/rolandkuhn -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Sequence of events [OnComplete, OnNext, OnError] when using ActorSubscriber with akka streams
Hello All, We are using Akka Streams to process 400,000 xml documents, run it through series of transformations and then save it to a database. We are using basic transformation and here is how our stream code looks, Source(Set(allDocumentUris)) .map(uri = getDocumentFromNetwork(uri)) .map(doc = transformation1(doc)) .map(doc = saveToDatabase(doc)) .runWith(Sink(transformationStatusActorSubscriber)) We wanted to keep track of all successful and failed documents and be able to print summary at the end of transformation. We decided to use ActorSubscriber at end of stream because it allows us to keep data and keep updating it without worrying about thread-safety. Here is how our actor looks: class SomeActor extends ActorSubscriber { private var successful = Set.empty[String] private var failed = Set.empty[String] private var numberOfDocumentsToProcess = 0 private var complete = false context.system.scheduler.schedule(1.minute, 1.minute, self, EchoProgress) override protected def requestStrategy: RequestStrategy = WatermarkRequestStrategy(highWatermark = 10) override def receive: Receive = { case EchoProgress = echoProgress() case Count(size) = numberOfDocumentsToProcess = size case OnNext(element: (String, Future[Unit])) = element._2 onComplete { case Success(_) = successful = successful + element._1 processComplete() case Failure(error) = failed = failed + TransformationFailure(element._1, error) processComplete() } case OnError(error) = context.stop(self) case OnComplete = complete = true } private def isStreamComplete = { val totalDocumentsProcessedSoFar = successful.size + failed.size complete (numberOfDocumentsToProcess == totalDocumentsProcessedSoFar) } private def processComplete() { if (isStreamComplete) { echoSummary() context.stop(self) } } private def echoProgress() ... private def echoSummary() ... } Problems that we are facing 1. Sequence of messages? We are not sure about the sequence of events that actor receives. Is it possible that Actor will receive OnComplete first but then some OnNext messages are still in queue? 2. When to stop Actor? What is the correct way to stop Actor? Right now we are stopping Actor in OnError event and OnNext event (for OnNext - we check if we have processed all documents and also have already received OnComplete event). If we stop Actor in OnError and OnComplete event will it work? 3. OutOfMemory issues? We ran stream with 2 GB memory but we faced OutOfMemory error before stream completed. Because backpressure is mandatory, we thought that this will not happen. We increased memory to 4 GB and after that program executed without OutOfMemory error. Did we miss anything in our implementation, how can we ensure that we will never get OutOfMemory error irrespective of memory size available to program? 4. Program dies abruptly Right now our current code stops at some point before it has processed all documents. From our observation we think that it stops after OnComplete message is received, but in OnComplete event we are not stopping the Actor. We are not sure how to debug/fix this behavior. Appreciate any help/suggestions on this. -Regards Ajay -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] SslTls example
Hi, Is there an example available where SslTls is used with Akka HTTP client/server? Thanks, Mathias -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Newbie Questions About PersistentView and Populating Read Datastores
Hi I looked into this question in the past. There are some long threads in the mailing lists and aggregated views seems to be something that will be considered for Akka in the future (unless I misunderstood the threads) but it is only after akka-http and akka-streams are matured. As for doing something right now, here are some patterns I reasoned about (these would keep the read model in memory in the actor, but that is just a detail): *Pull model:* In the pull model you are piggybacking the event log to double up as a communication bus between services that have no other dependencies. 1. Create a Parent aggregate actor with a hierarchy of PersistentView children. This works best for idempotent information and where eventually consistent is alright and you no requirements on ordering of the messages across the children. 2. Create a Parent aggregate PersistentActor with a hierarchy of PersistentView children. Have each PersistentView child include the sequence id and keep a tally in the PersistentActor of what has been received so far from each child. Drop duplicated messages and on recovery trigger replay in the children starting from the sequence number you have for that view. You now have guaranteed replay ordering that is guaranteed across restarts. You are paying by increased data storage. *Push model:* Akka PersistentViews queries the datasource every 4-5 sek by default. If you worry much about the time it takes for propagation and do not want to hammer the database for quick updates, you can implement a push version of above. Please note that this introduces an extra write for the channel. Instead of having PersistentViews you send the messages from each original PersistentActor over a channel to an Aggregated PersistentActor. The Aggregated actor persists the aggregated view. When you do this what you loose in complexity for not having PersistentView children you pay in complexity by having services that need to know about each other and depends on being up at the same time. Otherwise your dependency is only the data model/protocol at runtime. /Magnus Den tisdag 14 juli 2015 kl. 09:20:29 UTC+2 skrev Amiri Barksdale: I've been reading up here on PersistentActor, and I think I get how that works to perform commands and write the result to an event store. I also think I understand that PersistentViews can subscribe to a PersistentActor and receive notification of each event stored for that PersistentActor type. I want to take a PersistentView and use it to update a separate Read datastore. I don't want to treat the PersistentView itself as a read store, but I want to make it trigger the creation or updating or saving of a projection of the event in some other store, like, e.g., Elasticsearch or Postgresql. Are there any guidelines, best practices, or examples of how to do this? One thread ( https://groups.google.com/forum/#!searchin/akka-user/persistentview/akka-user/rMHjwBZpocQ/SmfAGMg7G68J) from June 2014 seemed to indicate that this sort of writing to another store would require PersistentViews to be able to read from multiple PersistentActors for this to be feasible. Is this still true? What is the path to take here, to get the actor system populating my read stores? Are people instead creating projections directly from the event store itself, like Greg Young's EventStore allows? Any insight is welcome! Amiri -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Can I avoid a class cast when using outlets in FlowShape?
Hi Thanks Victor, you're right! For the record I did try this before writing the post above, but it wouldn't work. I must have had some other type error at the same time that confused me. Because now it worked fine! /Magnus Den söndag 12 juli 2015 kl. 15:39:10 UTC+2 skrev √: On Sun, Jul 12, 2015 at 1:18 PM, Magnus Andersson magnus.a...@mollyware.se javascript: wrote: Hi I have to do a class cast to ge able to get a typed outlet from a flow that I used in a FlowGraph. 1. I'm wondering if I'm using the API wrong below, can I get the outlet in a different way? 2. Other suggestions for improvement apart from the single line is welcome, perhaps my whole approach is off? Code background: I'm forking off a call to a cache service, which in turn is an actor that periodically fetches a fresh token from Gcloud and caches it for the duration of the token validity. If I got a token returned then enrich the HttpRequest with an Authorization header. val addAuthenticationToken:Flow[HttpRequest, Option[HttpRequest]] = Flow() { implicit b ⇒ import FlowGraph.Implicits._ val bcast = b.add( Broadcast[HttpRequest]( 2 ) ) val zip = b.add( Zip[Option[Token], HttpRequest] ) val fetchToken:FlowShape[HttpRequest, Option[Token]] = b.add( fetchTokenService ) val addToken:FlowShape[(Option[Token], HttpRequest)] = b.add( mapTokenRequest ) bcast ~ fetchToken ~ zip.in0 bcast ~ zip.in1 zip.out ~ addToken // THIS CLASS CAST IS BUGGING ME. AM I USING THE API WRONG HERE? val output = addToken.outlets.head.asInstanceOf[Outlet[Option[HttpRequest]]] ( bcast.in, output ) } This is the reason the class cast is necessary, below shows what the source code for FlowShape in Akka looks like this: final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) // === NO TYPES HERE override val outlets: immutable.Seq[Outlet[_]] = List(outlet) // === NO TYPES HERE ... } Suggestions appreciated, thanks! Use `addToken.outlet` (as per signature below) final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = List(outlet) override def deepCopy(): FlowShape[I, O] = FlowShape(inlet.carbonCopy(), outlet.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.size == 1, sproposed inlets [${inlets.mkString(, )}] do not fit FlowShape) require(outlets.size == 1, sproposed outlets [${outlets.mkString(, )}] do not fit FlowShape) FlowShape(inlets.head, outlets.head) } } /Magnus -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: SslTls example
Hi Mathias, there may not be any good examples yet. On the server side you need to supply an `HttpsContext` to the `Http.bind()` method with all the SSL settings. On the client side there are either HTTPS variants like `Http.newHostConnectionPoolTls` or if you use the highest-level API you can just supply a request with an https URI to `Http.singleRequest`. The client side methods take an optional `HttpsContext` as well but will also use the default Java SSL settings (like root certificates, etc.) when none is supplied. HTH Johannes On Tuesday, July 14, 2015 at 11:57:08 AM UTC+2, Mathias Bogaert wrote: Hi, Is there an example available where SslTls is used with Akka HTTP client/server? Thanks, Mathias -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: akka streams - for comprehension counterpart
I see. So no happy path programming while not loosing the error case with reactive streams? This is a quite a bit disillusioning. Should be mentioned in all those shiny reactive stream presentations ;-) Is there some per stream exception handling mechanism instead which would materialize with the stream? I know the supervision/decider mechanism which is unfortunately per materializer or per stage. Leslie -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Can I avoid a class cast when using outlets in FlowShape?
Great to hear, happy hAkking On Tue, Jul 14, 2015 at 10:51 AM, Magnus Andersson magnus.anders...@mollyware.se wrote: Hi Thanks Victor, you're right! For the record I did try this before writing the post above, but it wouldn't work. I must have had some other type error at the same time that confused me. Because now it worked fine! /Magnus Den söndag 12 juli 2015 kl. 15:39:10 UTC+2 skrev √: On Sun, Jul 12, 2015 at 1:18 PM, Magnus Andersson magnus.a...@mollyware.se wrote: Hi I have to do a class cast to ge able to get a typed outlet from a flow that I used in a FlowGraph. 1. I'm wondering if I'm using the API wrong below, can I get the outlet in a different way? 2. Other suggestions for improvement apart from the single line is welcome, perhaps my whole approach is off? Code background: I'm forking off a call to a cache service, which in turn is an actor that periodically fetches a fresh token from Gcloud and caches it for the duration of the token validity. If I got a token returned then enrich the HttpRequest with an Authorization header. val addAuthenticationToken:Flow[HttpRequest, Option[HttpRequest]] = Flow() { implicit b ⇒ import FlowGraph.Implicits._ val bcast = b.add( Broadcast[HttpRequest]( 2 ) ) val zip = b.add( Zip[Option[Token], HttpRequest] ) val fetchToken:FlowShape[HttpRequest, Option[Token]] = b.add( fetchTokenService ) val addToken:FlowShape[(Option[Token], HttpRequest)] = b.add( mapTokenRequest ) bcast ~ fetchToken ~ zip.in0 bcast ~ zip.in1 zip.out ~ addToken // THIS CLASS CAST IS BUGGING ME. AM I USING THE API WRONG HERE? val output = addToken.outlets.head.asInstanceOf[Outlet[Option[HttpRequest]]] ( bcast.in, output ) } This is the reason the class cast is necessary, below shows what the source code for FlowShape in Akka looks like this: final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) // === NO TYPES HERE override val outlets: immutable.Seq[Outlet[_]] = List(outlet) // === NO TYPES HERE ... } Suggestions appreciated, thanks! Use `addToken.outlet` (as per signature below) final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = List(outlet) override def deepCopy(): FlowShape[I, O] = FlowShape(inlet.carbonCopy(), outlet.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.size == 1, sproposed inlets [${inlets.mkString(, )}] do not fit FlowShape) require(outlets.size == 1, sproposed outlets [${outlets.mkString(, )}] do not fit FlowShape) FlowShape(inlets.head, outlets.head) } } /Magnus -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: akka streams - for comprehension counterpart
Hi Leslie, On Tuesday, July 14, 2015 at 1:38:02 PM UTC+2, leslie...@googlemail.com wrote: When programming with functions this kind of issue is solved quite elegantly by using a for comprehension: Not a solution but a comment. In a for comprehension with usual types (Future/Option/Either/Try) the calculation is also cut short on the first error. So, it may not differ so much in that regard. However, streams are different in another way: a Flow[T, U] is more than just a function T = U, it's more like a T = Seq[U] that can create any number of results for any input element. This makes it hard to create something like an `eitherFlow(leftFlow: Flow[L, U], rightFlow: Flow[R, U]): Flow[Either[L, R], U]` that would bypass errors around some components because in general you somehow need to constrain the argument flows to produce exactly one output element for each input element. Even then you need to prevent two subsequent elements `Right(r)`, `Left(l)` to start a race between the left and right branch (if you are interested in keeping the order). I think that's the main problem, that it's hard to come with an exact specification for a general element that would solve the problem in all cases. akka-http has lots of places where we do pass control information around actual processing. However, in the end we had to hand-tune the bypass in most places because more general solutions didn't work out because of some pecularities of the needed semantics. Johannes -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: akka streams - for comprehension counterpart
Am Dienstag, 14. Juli 2015 15:42:33 UTC+2 schrieb √: Hi Leslie, On Tue, Jul 14, 2015 at 3:34 PM, leslie...@googlemail.com javascript: wrote: I see. So no happy path programming while not loosing the error case with reactive streams? This is a quite a bit disillusioning. Should be mentioned in all those shiny reactive stream presentations ;-) It's mentioned all the time afaik, onError is for out-of-band stream teardown, onComplete is in-band stream teardown. If one wants to track transient errors, the solution is to be honest about it and do it in the element type using something like Either or Try. This is exactly the point that is unclear to me. From 'classic' scala programming I'm used to have Try[T] outputs. So far its clear. But how do I pass it downstream? Currently I see two options: 1. Pass the Try[T] as input into the next stage which just pass through the error or evaluates new data. - I can do this only for flow stages which are designed for it. Client http requests via akka HttpExt Flow wouldn't accept a Try[T] the input needs to be a (HttpRequest, T) - Errors in early stages needs to be passed down through all stages 2. Transform each error causing flow stage into a graph with std out for the happy path and error out for the error path. - Would quickly become complex and error prone. Forking after each method invocation into a good and a bad path, I guess this was the point why exceptions were invented. Is there some per stream exception handling mechanism instead which would materialize with the stream? I know the supervision/decider mechanism which is unfortunately per materializer or per stage. Leslie -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ The use case I have in mind is an http server. There is a stream per request type, let say configure a dsl line. The stream is continuously fed with requests from a REST client and produces continuously responses which are sent back to the client. Inside the stream there are successive http request/response stages for communication with a device. There is lots of validation required here. And there is always a response required. Can it be that infinite streams are troublesome by nature? So it would be better to open one stream per request? Than the onComplete/onFailure way would work. But it still feels unnatural to me to have a stream per request pattern! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: default-dispatcher = CallingThreadDispatcher ?
Hi, I am trying to use the `CallingThreadDispatcherConfigurator`, but unfortunately the test blocks as soon as I try to instantiate an actor, with a deadlock that looks similar to an akka issue https://github.com/akka/akka/issues/17253. Have you faced a similar situation before? Do you have any idea how to get it working? On Thursday, January 19, 2012 at 4:00:04 PM UTC, D Pars wrote: Patrik, This works like a charm! (...he says as he deletes dozens and dozens of Thread.sleeps from his tests). Thank you! On Jan 19, 4:31 am, Patrik Nordwall patrik.nordw...@gmail.com wrote: You should define the class name of the MessageDispatcherConfigurator, not the actual dispatcher. Try type = akka.testkit.CallingThreadDispatcherConfigurator On Wed, Jan 18, 2012 at 10:50 PM, D Pars dpars...@gmail.com wrote: On my TEST classpath, I'm trying to use a .conf file with a CallingThreadDispatcher to mimic the behavior of a TestActorRef. Is there a way to do this globally? The following: default-dispatcher { type=akka.testkit.CallingThreadDispatcher } Is resulting in: java.lang.IllegalArgumentException: Cannot instantiate MessageDispatcherConfigurator type [akka.testkit.CallingThreadDispatcher], defined in [akka.actor.default- dispatcher], make sure it has constructor with [com.typesafe.config.Config] and [akka.dispatch.DispatcherPrerequisites] parameters at akka.dispatch.Dispatchers.configuratorFrom(Dispatchers.scala:149) at akka.dispatch.Dispatchers.lookupConfigurator(Dispatchers.scala:78) at akka.dispatch.Dispatchers.lookup(Dispatchers.scala:68) at akka.dispatch.Dispatchers.defaultGlobalDispatcher(Dispatchers.scala: 58) at akka.actor.ActorSystemImpl.init(ActorSystem.scala:368) at akka.actor.ActorSystem$.apply(ActorSystem.scala:46) at akka.actor.ActorSystem$.create(ActorSystem.scala:45) at akka.actor.ActorSystem.create(ActorSystem.scala) ... Caused by: java.lang.NoSuchMethodException: akka.testkit.CallingThreadDispatcher.init(com.typesafe.config.Config, akka.dispatch.DispatcherPrerequisites) at java.lang.Class.getConstructor0(Class.java:2706) at java.lang.Class.getDeclaredConstructor(Class.java:1985) at akka.util.ReflectiveAccess$$anonfun$createInstance $2.apply(ReflectiveAccess.scala:35) at akka.util.ReflectiveAccess$$anonfun$createInstance $2.apply(ReflectiveAccess.scala:30) at akka.util.ReflectiveAccess $.withErrorHandling(ReflectiveAccess.scala:106) at akka.util.ReflectiveAccess$.createInstance(ReflectiveAccess.scala: 30) at akka.dispatch.Dispatchers.configuratorFrom(Dispatchers.scala:146) ... 38 more -- You received this message because you are subscribed to the Google Groups Akka User List group. To post to this group, send email to akka...@googlegroups.com javascript:. To unsubscribe from this group, send email to akka-user+...@googlegroups.com javascript:. For more options, visit this group at http://groups.google.com/group/akka-user?hl=en. -- Patrik Nordwall Typesafe http://typesafe.com/ - The software stack for applications that scale Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: default-dispatcher = CallingThreadDispatcher ?
Hi Hosam, why do you want to run Akka using 2 threads? Where do you configure the calling thread dispatcher? -- Cheers, Konrad 'ktoso’ Malawski Akka @ Typesafe On 14 July 2015 at 17:02:16, Hosam Aly (hosama...@gmail.com) wrote: Hi, I am trying to use the `CallingThreadDispatcherConfigurator`, but unfortunately the test blocks as soon as I try to instantiate an actor, with a deadlock that looks similar to an akka issue. Have you faced a similar situation before? Do you have any idea how to get it working? On Thursday, January 19, 2012 at 4:00:04 PM UTC, D Pars wrote: Patrik, This works like a charm! (...he says as he deletes dozens and dozens of Thread.sleeps from his tests). Thank you! On Jan 19, 4:31 am, Patrik Nordwall patrik.nordw...@gmail.com wrote: You should define the class name of the MessageDispatcherConfigurator, not the actual dispatcher. Try type = akka.testkit.CallingThreadDispatcherConfigurator On Wed, Jan 18, 2012 at 10:50 PM, D Pars dpars...@gmail.com wrote: On my TEST classpath, I'm trying to use a .conf file with a CallingThreadDispatcher to mimic the behavior of a TestActorRef. Is there a way to do this globally? The following: default-dispatcher { type=akka.testkit.CallingThreadDispatcher } Is resulting in: java.lang.IllegalArgumentException: Cannot instantiate MessageDispatcherConfigurator type [akka.testkit.CallingThreadDispatcher], defined in [akka.actor.default- dispatcher], make sure it has constructor with [com.typesafe.config.Config] and [akka.dispatch.DispatcherPrerequisites] parameters at akka.dispatch.Dispatchers.configuratorFrom(Dispatchers.scala:149) at akka.dispatch.Dispatchers.lookupConfigurator(Dispatchers.scala:78) at akka.dispatch.Dispatchers.lookup(Dispatchers.scala:68) at akka.dispatch.Dispatchers.defaultGlobalDispatcher(Dispatchers.scala: 58) at akka.actor.ActorSystemImpl.init(ActorSystem.scala:368) at akka.actor.ActorSystem$.apply(ActorSystem.scala:46) at akka.actor.ActorSystem$.create(ActorSystem.scala:45) at akka.actor.ActorSystem.create(ActorSystem.scala) ... Caused by: java.lang.NoSuchMethodException: akka.testkit.CallingThreadDispatcher.init(com.typesafe.config.Config, akka.dispatch.DispatcherPrerequisites) at java.lang.Class.getConstructor0(Class.java:2706) at java.lang.Class.getDeclaredConstructor(Class.java:1985) at akka.util.ReflectiveAccess$$anonfun$createInstance $2.apply(ReflectiveAccess.scala:35) at akka.util.ReflectiveAccess$$anonfun$createInstance $2.apply(ReflectiveAccess.scala:30) at akka.util.ReflectiveAccess $.withErrorHandling(ReflectiveAccess.scala:106) at akka.util.ReflectiveAccess$.createInstance(ReflectiveAccess.scala: 30) at akka.dispatch.Dispatchers.configuratorFrom(Dispatchers.scala:146) ... 38 more -- You received this message because you are subscribed to the Google Groups Akka User List group. To post to this group, send email to akka...@googlegroups.com. To unsubscribe from this group, send email to akka-user+...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/akka-user?hl=en. -- Patrik Nordwall Typesafe http://typesafe.com/ - The software stack for applications that scale Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Message Broadcasting in an akka cluster
You could implement the mediator yourself and include all your business logic there. That said, why do this? What's the fear of an extra actor dealing with these concerns and providing separation? I don't think it's possible to use an external implementation and not have separation. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Why does FlexiMerge mandate same type on all inlets?
Hi I'm trying to build an EitherRoute and EitherMerge flows. I've completed the EitherRoute which has one inlet that accepts Either[L,R] and two outlets that produces either L or R. When I'm constructing the EitherMerge I run into problems. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Why does FlexiMerge mandate same type on all inlets?
Sorry, post button pressed by mistake. Continued: FlexiMerge only seems to be able to handle inlets of the same type. I wish to have a preferred merge that accepts two inputs, L and R and then produce Either[L,R]. I would use the read preferred. Why this limitation and how can I work around it? I have created a gist that illustrates my problem https://gist.github.com/magnusart/0802295c0fafdf9b5028. Any suggestions appreciated! /Magnus Den tisdag 14 juli 2015 kl. 17:54:23 UTC+2 skrev Magnus Andersson: Hi I'm trying to build an EitherRoute and EitherMerge flows. I've completed the EitherRoute which has one inlet that accepts Either[L,R] and two outlets that produces either L or R. When I'm constructing the EitherMerge I run into problems. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: default-dispatcher = CallingThreadDispatcher ?
Hi Konrad, Thanks for the quick reply! I actually wish I could run it with just 1 thread. I wish my tests could become totally synchronous. I am trying to find out how to do that. I am currently configuring it in application.conf. Any suggestions would be appreciated! Thank you, Hosam Aly On Tuesday, July 14, 2015 at 4:20:40 PM UTC+1, Konrad Malawski wrote: Hi Hosam, why do you want to run Akka using 2 threads? Where do you configure the calling thread dispatcher? -- Cheers, Konrad 'ktoso’ Malawski Akka http://akka.io @ Typesafe http://typesafe.com On 14 July 2015 at 17:02:16, Hosam Aly (hosa...@gmail.com javascript:) wrote: Hi, I am trying to use the `CallingThreadDispatcherConfigurator`, but unfortunately the test blocks as soon as I try to instantiate an actor, with a deadlock that looks similar to an akka issue https://github.com/akka/akka/issues/17253. Have you faced a similar situation before? Do you have any idea how to get it working? On Thursday, January 19, 2012 at 4:00:04 PM UTC, D Pars wrote: Patrik, This works like a charm! (...he says as he deletes dozens and dozens of Thread.sleeps from his tests). Thank you! On Jan 19, 4:31 am, Patrik Nordwall patrik.nordw...@gmail.com wrote: You should define the class name of the MessageDispatcherConfigurator, not the actual dispatcher. Try type = akka.testkit.CallingThreadDispatcherConfigurator On Wed, Jan 18, 2012 at 10:50 PM, D Pars dpars...@gmail.com wrote: On my TEST classpath, I'm trying to use a .conf file with a CallingThreadDispatcher to mimic the behavior of a TestActorRef. Is there a way to do this globally? The following: default-dispatcher { type=akka.testkit.CallingThreadDispatcher } Is resulting in: java.lang.IllegalArgumentException: Cannot instantiate MessageDispatcherConfigurator type [akka.testkit.CallingThreadDispatcher], defined in [akka.actor.default- dispatcher], make sure it has constructor with [com.typesafe.config.Config] and [akka.dispatch.DispatcherPrerequisites] parameters at akka.dispatch.Dispatchers.configuratorFrom(Dispatchers.scala:149) at akka.dispatch.Dispatchers.lookupConfigurator(Dispatchers.scala:78) at akka.dispatch.Dispatchers.lookup(Dispatchers.scala:68) at akka.dispatch.Dispatchers.defaultGlobalDispatcher(Dispatchers.scala: 58) at akka.actor.ActorSystemImpl.init(ActorSystem.scala:368) at akka.actor.ActorSystem$.apply(ActorSystem.scala:46) at akka.actor.ActorSystem$.create(ActorSystem.scala:45) at akka.actor.ActorSystem.create(ActorSystem.scala) ... Caused by: java.lang.NoSuchMethodException: akka.testkit.CallingThreadDispatcher.init(com.typesafe.config.Config, akka.dispatch.DispatcherPrerequisites) at java.lang.Class.getConstructor0(Class.java:2706) at java.lang.Class.getDeclaredConstructor(Class.java:1985) at akka.util.ReflectiveAccess$$anonfun$createInstance $2.apply(ReflectiveAccess.scala:35) at akka.util.ReflectiveAccess$$anonfun$createInstance $2.apply(ReflectiveAccess.scala:30) at akka.util.ReflectiveAccess $.withErrorHandling(ReflectiveAccess.scala:106) at akka.util.ReflectiveAccess$.createInstance(ReflectiveAccess.scala: 30) at akka.dispatch.Dispatchers.configuratorFrom(Dispatchers.scala:146) ... 38 more -- You received this message because you are subscribed to the Google Groups Akka User List group. To post to this group, send email to akka...@googlegroups.com. To unsubscribe from this group, send email to akka-user+...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/akka-user?hl=en. -- Patrik Nordwall Typesafe http://typesafe.com/ - The software stack for applications that scale Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email
Re: [akka-user] Re: akka streams - for comprehension counterpart
On Tue, Jul 14, 2015 at 4:51 PM, leslie.le...@googlemail.com wrote: Am Dienstag, 14. Juli 2015 15:42:33 UTC+2 schrieb √: Hi Leslie, On Tue, Jul 14, 2015 at 3:34 PM, leslie...@googlemail.com wrote: I see. So no happy path programming while not loosing the error case with reactive streams? This is a quite a bit disillusioning. Should be mentioned in all those shiny reactive stream presentations ;-) It's mentioned all the time afaik, onError is for out-of-band stream teardown, onComplete is in-band stream teardown. If one wants to track transient errors, the solution is to be honest about it and do it in the element type using something like Either or Try. This is exactly the point that is unclear to me. From 'classic' scala programming I'm used to have Try[T] outputs. So far its clear. But how do I pass it downstream? Currently I see two options: 1. Pass the Try[T] as input into the next stage which just pass through the error or evaluates new data. - I can do this only for flow stages which are designed for it. Client http requests via akka HttpExt Flow wouldn't accept a Try[T] the input needs to be a (HttpRequest, T) Which means that you need to decide what to do before passing it along to that. How is that different from how you'd need to deal with having a Try[T] and require to return T? - Errors in early stages needs to be passed down through all stages If they are transient you can discard them if the parts after it are not interested. 2. Transform each error causing flow stage into a graph with std out for the happy path and error out for the error path. - Would quickly become complex and error prone. Forking after each method invocation into a good and a bad path, I guess this was the point why exceptions were invented. As Johannes mentioned before, the current semantics is exactly that of normal code (exceptions thrown unwinds the stack and abandon work to be done after the cause of the exception. It should also be possible to hoist T = R Flows into Try[T] = Try[R] (passing thru failures from upstream to downstream) and Try[T] = R (dropping failures from upstream). Is there some per stream exception handling mechanism instead which would materialize with the stream? I know the supervision/decider mechanism which is unfortunately per materializer or per stage. Leslie -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ The use case I have in mind is an http server. There is a stream per request type, let say configure a dsl line. The stream is continuously fed with requests from a REST client and produces continuously responses which are sent back to the client. Inside the stream there are successive http request/response stages for communication with a device. There is lots of validation required here. And there is always a response required. Can it be that infinite streams are troublesome by nature? So it would be better to open one stream per request? Than the onComplete/onFailure way would work. But it still feels unnatural to me to have a stream per request pattern! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Sequence of events [OnComplete, OnNext, OnError] when using ActorSubscriber with akka streams
You are closing over internal actor state and are manipulating it from other threads (inside the onComplete callback of the future) You could possibly avoid the ActorSubscriber alltogether by using mapAsync + conflate and a periodic Sink. On Tue, Jul 14, 2015 at 9:18 AM, Ajay Kamble ajay.rivend...@gmail.com wrote: Hello All, We are using Akka Streams to process 400,000 xml documents, run it through series of transformations and then save it to a database. We are using basic transformation and here is how our stream code looks, Source(Set(allDocumentUris)) .map(uri = getDocumentFromNetwork(uri)) .map(doc = transformation1(doc)) .map(doc = saveToDatabase(doc)) .runWith(Sink(transformationStatusActorSubscriber)) We wanted to keep track of all successful and failed documents and be able to print summary at the end of transformation. We decided to use ActorSubscriber at end of stream because it allows us to keep data and keep updating it without worrying about thread-safety. Here is how our actor looks: class SomeActor extends ActorSubscriber { private var successful = Set.empty[String] private var failed = Set.empty[String] private var numberOfDocumentsToProcess = 0 private var complete = false context.system.scheduler.schedule(1.minute, 1.minute, self, EchoProgress) override protected def requestStrategy: RequestStrategy = WatermarkRequestStrategy(highWatermark = 10) override def receive: Receive = { case EchoProgress = echoProgress() case Count(size) = numberOfDocumentsToProcess = size case OnNext(element: (String, Future[Unit])) = element._2 onComplete { case Success(_) = successful = successful + element._1 processComplete() case Failure(error) = failed = failed + TransformationFailure(element._1, error) processComplete() } case OnError(error) = context.stop(self) case OnComplete = complete = true } private def isStreamComplete = { val totalDocumentsProcessedSoFar = successful.size + failed.size complete (numberOfDocumentsToProcess == totalDocumentsProcessedSoFar) } private def processComplete() { if (isStreamComplete) { echoSummary() context.stop(self) } } private def echoProgress() ... private def echoSummary() ... } Problems that we are facing 1. Sequence of messages? We are not sure about the sequence of events that actor receives. Is it possible that Actor will receive OnComplete first but then some OnNext messages are still in queue? 2. When to stop Actor? What is the correct way to stop Actor? Right now we are stopping Actor in OnError event and OnNext event (for OnNext - we check if we have processed all documents and also have already received OnComplete event). If we stop Actor in OnError and OnComplete event will it work? 3. OutOfMemory issues? We ran stream with 2 GB memory but we faced OutOfMemory error before stream completed. Because backpressure is mandatory, we thought that this will not happen. We increased memory to 4 GB and after that program executed without OutOfMemory error. Did we miss anything in our implementation, how can we ensure that we will never get OutOfMemory error irrespective of memory size available to program? 4. Program dies abruptly Right now our current code stops at some point before it has processed all documents. From our observation we think that it stops after OnComplete message is received, but in OnComplete event we are not stopping the Actor. We are not sure how to debug/fix this behavior. Appreciate any help/suggestions on this. -Regards Ajay -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit
Re: [akka-user] Asynchronous file reading
The nice thing with Akka streams and a Source[ByteString, _] is that you can easily swap it for a different source without changing the rest of your impl :-) The current impl uses nio.FileChannel with ByteBuffers which proved to give very good performance - and I've benchmarked a number of impls (file input streams, asynchronous file channel, file channel, and all those in combination with pooled byte buffers or arrays etc). The gains that AIO gives over IO are more in terms of scalability than performance I think actually. Of course it depends on access patterns, but since here we're in streaming a plain old scan is the best we want to have here. All in all, sticking to SynchronousFileSource (it is blocking, however it has it's own dedicated, preconfigured dispatcher!) seems like a very good choice to me. -- Cheers, Konrad 'ktoso’ Malawski Akka @ Typesafe On 14 July 2015 at 21:07:31, Nicolau Werneck (nwern...@gmail.com) wrote: I just want to make sure I am using the most promising alternative for I/O... I was reading files on my own with a low-level Java library, and I wasn't using nio yet. I wasn't sure if I was going to update it to use Java 8 stuff, try out akka.io, or move to akka-stream. I think I am going with Akka stream now, and I have already managed to adapt my project... Thanks, guys! ++nic On Monday, July 13, 2015 at 5:06:39 AM UTC-3, √ wrote: I'm not sure we'll be able to take advantage of that, since all they do is to multiplex it on top of a given Executor. What we could do is to create a shim over an ExecutionContext that wraps their submitted Runnables in BlockContext calls to isolate and make evasive actions. Let's discuss :) On Mon, Jul 13, 2015 at 10:01 AM, Roland Kuhn goo...@rkuhn.info wrote: Hi Nicolau, this problem is not limited to Akka: most operating systems do not provide true asynchronous file I/O themselves. We will eventually make use of Java 8’s facilities (once Streams move into the master branch), but beware that the asynchrony of the solution depends on deployment details (JVM, O/S, …). Regards, Roland 13 jul 2015 kl. 02:47 skrev Nicolau Werneck nwer...@gmail.com: What is the most current implementation of Akka based asynchronous file I/O out there? It seems akka-stream 1.0-RC3 still only has synchronous reading, and drexin's akka-io-file github repo is more than one year old. Is there no highly-experimental official async file IO library out there? ++nic -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. Dr. Roland Kuhn Akka Tech Lead Typesafe – Reactive apps on the JVM. twitter: @rolandkuhn -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: akka actor - performance issue
1. Mistake, not 8000k ms , should be 8k ms 2. Mensioned ~13% time difference in perf. time get bigger with growing number of CountingActors. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Why does FlexiMerge mandate same type on all inlets?
Inlets are naturally Contravariant (an inlet that reads A can be viewed as an inlet that reads B : A), but that breaks down slightly when you're viewing it 'from the other side' - as the Outlet of something else. Perhaps there should be a view of the Inlet which can be used in a merge which is actually covariant, IDK. You can, however, work around it by ignoring that little detail: State[Any](ReadPreferred[Nothing](p.right, p.left).asInstanceOf[ReadPreferred[Any]]) { ... Since Nothing is a common subclass of everything, the contravariance allows you to construct the ReadPreferred[Nothing]. Then you can just cast it to a ReadPreferred[Any] - Type Erasure means it will cast successfully (at runtime)! The main danger here is that something in the implementation changes and this sort of thing might give you a runtime exception somewhere inside the FlexiMergeImpl. Luckily the current implementation will not. The other thing you should note is that L R are also erased by Type Erasure, so if you want to match on them you'll need to have a ClassTag in scope (e.g. [L : ClassTag, R : ClassTag]) (A simpler alternative is to put a separate map in front of each input from a = Left(a) and b = Right(b), then a simple MergePreferred will suffice) On Tuesday, 14 July 2015 16:58:12 UTC+1, Magnus Andersson wrote: Sorry, post button pressed by mistake. Continued: FlexiMerge only seems to be able to handle inlets of the same type. I wish to have a preferred merge that accepts two inputs, L and R and then produce Either[L,R]. I would use the read preferred. Why this limitation and how can I work around it? I have created a gist that illustrates my problem https://gist.github.com/magnusart/0802295c0fafdf9b5028. Any suggestions appreciated! /Magnus Den tisdag 14 juli 2015 kl. 17:54:23 UTC+2 skrev Magnus Andersson: Hi I'm trying to build an EitherRoute and EitherMerge flows. I've completed the EitherRoute which has one inlet that accepts Either[L,R] and two outlets that produces either L or R. When I'm constructing the EitherMerge I run into problems. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] akka actor - performance issue
Hi everyone! I working on some scala computing some game theory problem using Akka. Basically I've got three types involved : 1. ComputingActor - does the job, running 1 actor for 1 experiment 2. ResultsActor - collects results from computing actors and sends to JobScheduler when job's done 3. JobScheduler - not actor type - it takes all results from ResultsActor using '?' tell and Await.result() In ComputingActor I've got some for loop which runs 50 times, core computations are done there. for(i - 1 to basicParams.getNoOfSeas) { strgEval.evaluate //my computation I've done some performance tests and turns out that one thread program is faster ~13% (40k ms to 46k ms) What I've noticed is that when I increase number of experiments (means more actors running) time spent in this for loop is longer. So I commented out my computation in for loop and set loop like this one to check if my computation code is responsible for that. //for(j - 0 to 1000) { // var z = j*2 //} It does nothing, just keep busy CPU, but tendency is same. I don't have a good idea what might be the problem, bad design?Is it something that I missed? I've checked execution with profiler, it seams threads're working good(no bloking). Is fork-join-dispatcher working like that? I mean, it seams that work is scheduled from other ComputingActor instances on different threads. First actor ends computations in 38k ms and others soon after him, but in single threaded version it takes only 8000k ms using my computation code. In that case multithreaded program doesn't give any additional performance value. :/ Will appreciate any hints. PS. I also cheked it on coomon java thread version of program. If I spawn more threads I've got longer in that loop. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: akka actor - performance issue
How many CPU cores do you have, and how many simultaneous actors are you running? You should at least see a speedup of close to 2x with two actors, but as you increase the number of actors you will eventually reach a limit due to overheads and serial portions of the program. Processing might even become slower than in serial. Ahmdal's law, sort of. ++nic On Tuesday, July 14, 2015 at 8:18:24 PM UTC-3, fly...@gmail.com wrote: Hi everyone! I working on some scala computing some game theory problem using Akka. Basically I've got three types involved : 1. ComputingActor - does the job, running 1 actor for 1 experiment 2. ResultsActor - collects results from computing actors and sends to JobScheduler when job's done 3. JobScheduler - not actor type - it takes all results from ResultsActor using '?' tell and Await.result() In ComputingActor I've got some for loop which runs 50 times, core computations are done there. for(i - 1 to basicParams.getNoOfSeas) { strgEval.evaluate //my computation I've done some performance tests and turns out that one thread program is faster ~13% (40k ms to 46k ms) What I've noticed is that when I increase number of experiments (means more actors running) time spent in this for loop is longer. So I commented out my computation in for loop and set loop like this one to check if my computation code is responsible for that. //for(j - 0 to 1000) { // var z = j*2 //} It does nothing, just keep busy CPU, but tendency is same. I don't have a good idea what might be the problem, bad design?Is it something that I missed? I've checked execution with profiler, it seams threads're working good(no bloking). Is fork-join-dispatcher working like that? I mean, it seams that work is scheduled from other ComputingActor instances on different threads. First actor ends computations in 38k ms and others soon after him, but in single threaded version it takes only 8000k ms using my computation code. In that case multithreaded program doesn't give any additional performance value. :/ Will appreciate any hints. PS. I also cheked it on coomon java thread version of program. If I spawn more threads I've got longer in that loop. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Modeling simple TCP protocol that isn't always request/response in akka-stream
I just had to do this and I found the AsyncStage which allows out-of-band messages to get added to the stream. It and the DetachedStage are not yet documented[0] but after reading enough other code and taking some guesses, I was able to get out-of-band messages into my IMAP server via an AsyncStage[1] so that it would work with IDLE[2]. Hopefully it helps. 0 - http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC4/scala/stream-customize.html#Using_DetachedStage 1 - https://github.com/cretz/scimap/blob/a8dd56f9c2988098e61d1e30c495ea9ed14944dd/src/main/scala/scimap/handler/ServerHandlerStage.scala 2 - https://github.com/cretz/scimap/blob/a8dd56f9c2988098e61d1e30c495ea9ed14944dd/src/it/scala/scimap/JavaMailSpec.scala#L114 On Wednesday, July 1, 2015 at 6:55:33 AM UTC-5, Chad Selph wrote: I tried something like this, where I had a PushPullStage that accepted a Source into its constructor. It seemed like the wrong approach, because it seemed like I was just reimplementing a common merge functionality. On Wednesday, July 1, 2015 at 3:58:49 AM UTC+3, Chad Retz wrote: Although I have not done this yet, I would assume you can do this with a PushPullStage that pushes in onPull. How the state gets there to know when to push in onPull is probably an implementation detail. On Monday, June 29, 2015 at 5:15:53 AM UTC-5, Chad Selph wrote: I'm currently rewriting a library I have that previously used akka IO pipelines. Reading through the examples of streaming IO with akka-stream and akka-http, I've only seen examples with request - response type protocols. Currently, I have a BidiFlow that serializes/deserializes between ByteStrings and case classes, but I only know how to make this work in one direction (client makes request, server replies to request). In this protocol, the server can also initiate requests to the client. How can I inject a message into the BidiFlow that isn't just a response to a client request? I understand at a high level the solution is probably to make a Graph with another Source pointing to my serializer, but I've had a tough time wrapping my head around the graph DSL. If someone could point me to an example of something similar, I'd very much appreciate it. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Problem creating a balancing router for a stream sink
I am trying to write an ActorSubscriber to use as a sink to am akka stream. This actor has a router to the processing of the incoming data. My code is working just fine right now, I have it up on GitHub (https://github.com/projetoeureka/akka-mapreduce/commit/94a7896ffe5ab5b66e72a04b8ccd6c2f0b8ef390) The problem is I only managed to make it work when using a SmallestMailbox routing logic. I wanted to use a BalacingPool. But when I replaced the kind of router in my code I cannot even create the router actor. Instead I get this error below. Any thoughs? ++nic [ERROR] [07/14/2015 15:43:21.994] [Main-akka.actor.default-dispatcher-3] [akka://Main/user/$a/flow-1-3-actorSubscriberSink] received Supervise from unregistered child Actor[akka://Main/user/$a/flow-1-3-actorSubscriberSink/mapred-router#-332511953], this will not end well [ERROR] [07/14/2015 15:43:21.997] [Main-akka.actor.default-dispatcher-5] [akka://Main/user/$a/flow-1-3-actorSubscriberSink] configuration problem while creating [akka://Main/user/$a/flow-1-3-actorSubscriberSink/mapred-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:166) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.ConfigurationException: configuration problem while creating [akka://Main/user/$a/flow-1-3-actorSubscriberSink/mapred-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:753) at akka.actor.dungeon.Children$class.makeChild(Children.scala:206) at akka.actor.dungeon.Children$class.actorOf(Children.scala:37) at akka.actor.ActorCell.actorOf(ActorCell.scala:369) at geekie.mapred.Mapred.init(Mapred.scala:27) at geekie.mapred.Mapred$$anonfun$props$1.apply(Mapred.scala:58) at geekie.mapred.Mapred$$anonfun$props$1.apply(Mapred.scala:58) at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:343) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more Caused by: com.typesafe.config.ConfigException$BadPath: path parameter: Invalid path 'BalancingPool-/$a/flow-1-3-actorSubscriberSink/mapred-router': Token not allowed in path expression: 'a' ('$' not followed by {, 'a' not allowed after '$') (you can double-quote this token if you really want it here) at com.typesafe.config.impl.Parser.parsePathExpression(Parser.java:1095) at com.typesafe.config.impl.Parser.parsePath(Parser.java:1135) at com.typesafe.config.impl.Path.newPath(Path.java:224) at com.typesafe.config.impl.SimpleConfig.hasPath(SimpleConfig.java:80) at akka.dispatch.CachingConfig.hasPath(CachingConfig.scala:97) at akka.dispatch.Dispatchers.hasDispatcher(Dispatchers.scala:89) at akka.routing.BalancingPool.newRoutee(Balancing.scala:106) at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116) at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116) at scala.collection.generic.GenTraversableFactory.fill(GenTraversableFactory.scala:90) at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:116) at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:41) at akka.actor.RepointableActorRef.point(RepointableActorRef.scala:105) at akka.actor.RepointableActorRef.initialize(RepointableActorRef.scala:82) at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:751) ... 19 more -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at
[akka-user] Re: Sequence of events [OnComplete, OnNext, OnError] when using ActorSubscriber with akka streams
I'm just starting to learn, but AFAIK order is like you would expect: you can't receive OnNext aftyer OnComplete. Also you should be terminateing at OnComplete, not at OnNext. I am not sure how your code is working, but apparently you are receiving a bunch of Futures, and attaching callbacks to them. You are not actually processing anything at the actor. So you are just pulling futures, transforming them and going ahead. You should be waiting for these futures somehow in order to actually put some back pressure to the stream. That would explain the memory exhaustion. ++nic On Tuesday, July 14, 2015 at 4:20:34 AM UTC-3, Ajay Kamble wrote: Hello All, We are using Akka Streams to process 400,000 xml documents, run it through series of transformations and then save it to a database. We are using basic transformation and here is how our stream code looks, Source(Set(allDocumentUris)) .map(uri = getDocumentFromNetwork(uri)) .map(doc = transformation1(doc)) .map(doc = saveToDatabase(doc)) .runWith(Sink(transformationStatusActorSubscriber)) We wanted to keep track of all successful and failed documents and be able to print summary at the end of transformation. We decided to use ActorSubscriber at end of stream because it allows us to keep data and keep updating it without worrying about thread-safety. Here is how our actor looks: class SomeActor extends ActorSubscriber { private var successful = Set.empty[String] private var failed = Set.empty[String] private var numberOfDocumentsToProcess = 0 private var complete = false context.system.scheduler.schedule(1.minute, 1.minute, self, EchoProgress) override protected def requestStrategy: RequestStrategy = WatermarkRequestStrategy(highWatermark = 10) override def receive: Receive = { case EchoProgress = echoProgress() case Count(size) = numberOfDocumentsToProcess = size case OnNext(element: (String, Future[Unit])) = element._2 onComplete { case Success(_) = successful = successful + element._1 processComplete() case Failure(error) = failed = failed + TransformationFailure(element._1, error) processComplete() } case OnError(error) = context.stop(self) case OnComplete = complete = true } private def isStreamComplete = { val totalDocumentsProcessedSoFar = successful.size + failed.size complete (numberOfDocumentsToProcess == totalDocumentsProcessedSoFar) } private def processComplete() { if (isStreamComplete) { echoSummary() context.stop(self) } } private def echoProgress() ... private def echoSummary() ... } Problems that we are facing 1. Sequence of messages? We are not sure about the sequence of events that actor receives. Is it possible that Actor will receive OnComplete first but then some OnNext messages are still in queue? 2. When to stop Actor? What is the correct way to stop Actor? Right now we are stopping Actor in OnError event and OnNext event (for OnNext - we check if we have processed all documents and also have already received OnComplete event). If we stop Actor in OnError and OnComplete event will it work? 3. OutOfMemory issues? We ran stream with 2 GB memory but we faced OutOfMemory error before stream completed. Because backpressure is mandatory, we thought that this will not happen. We increased memory to 4 GB and after that program executed without OutOfMemory error. Did we miss anything in our implementation, how can we ensure that we will never get OutOfMemory error irrespective of memory size available to program? 4. Program dies abruptly Right now our current code stops at some point before it has processed all documents. From our observation we think that it stops after OnComplete message is received, but in OnComplete event we are not stopping the Actor. We are not sure how to debug/fix this behavior. Appreciate any help/suggestions on this. -Regards Ajay -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Asynchronous file reading
I just want to make sure I am using the most promising alternative for I/O... I was reading files on my own with a low-level Java library, and I wasn't using nio yet. I wasn't sure if I was going to update it to use Java 8 stuff, try out akka.io, or move to akka-stream. I think I am going with Akka stream now, and I have already managed to adapt my project... Thanks, guys!++nic On Monday, July 13, 2015 at 5:06:39 AM UTC-3, √ wrote: I'm not sure we'll be able to take advantage of that, since all they do is to multiplex it on top of a given Executor. What we could do is to create a shim over an ExecutionContext that wraps their submitted Runnables in BlockContext calls to isolate and make evasive actions. Let's discuss :) On Mon, Jul 13, 2015 at 10:01 AM, Roland Kuhn goo...@rkuhn.info javascript: wrote: Hi Nicolau, this problem is not limited to Akka: most operating systems do not provide true asynchronous file I/O themselves. We will eventually make use of Java 8’s facilities (once Streams move into the master branch), but beware that the asynchrony of the solution depends on deployment details (JVM, O/S, …). Regards, Roland 13 jul 2015 kl. 02:47 skrev Nicolau Werneck nwer...@gmail.com javascript:: What is the most current implementation of Akka based asynchronous file I/O out there? It seems akka-stream 1.0-RC3 still only has synchronous reading, and drexin's akka-io-file github repo is more than one year old. Is there no highly-experimental official async file IO library out there? ++nic -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. *Dr. Roland Kuhn* *Akka Tech Lead* Typesafe http://typesafe.com/ – Reactive apps on the JVM. twitter: @rolandkuhn http://twitter.com/#!/rolandkuhn -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.