I'm just starting to learn, but AFAIK order is like you would expect: you 
can't receive OnNext aftyer OnComplete. Also you should be terminateing at 
OnComplete, not at OnNext.

I am not sure how your code is working, but apparently you are receiving a 
bunch of Futures, and attaching callbacks to them. You are not actually 
processing anything at the actor. So you are just pulling futures, 
transforming them and going ahead. You should be waiting for these futures 
somehow in order to actually put some back pressure to the stream. That 
would explain the memory exhaustion.

    ++nic

On Tuesday, July 14, 2015 at 4:20:34 AM UTC-3, Ajay Kamble wrote:
>
> Hello All,
>
> We are using Akka Streams to process 400,000 xml documents, run it through 
> series of transformations and then save it to a database. We are using 
> basic transformation and here is how our stream code looks,
>
> Source(Set(allDocumentUris))
>       .map(uri => getDocumentFromNetwork(uri))
>       .map(doc => transformation1(doc))
>       .map(doc => saveToDatabase(doc))
>       .runWith(Sink(transformationStatusActorSubscriber))
>
> We wanted to keep track of all successful and failed documents and be able 
> to print summary at the end of transformation. We decided to use 
> ActorSubscriber at end of stream because it allows us to keep data and keep 
> updating it without worrying about thread-safety. Here is how our actor 
> looks:
>
> class SomeActor extends ActorSubscriber {
>
>   private var successful = Set.empty[String]
>   private var failed = Set.empty[String]
>
>   private var numberOfDocumentsToProcess = 0
>   private var complete = false
>
>   context.system.scheduler.schedule(1.minute, 1.minute, self, EchoProgress)
>
>   override protected def requestStrategy: RequestStrategy = 
> WatermarkRequestStrategy(highWatermark = 10)
>
>   override def receive: Receive = {
>     case EchoProgress =>
>       echoProgress()
>     case Count(size) =>
>       numberOfDocumentsToProcess = size
>     case OnNext(element: (String, Future[Unit])) =>
>       element._2 onComplete {
>         case Success(_) =>
>           successful = successful + element._1
>           processComplete()
>         case Failure(error) =>
>           failed = failed + TransformationFailure(element._1, error)
>           processComplete()
>       }
>     case OnError(error) =>
>       context.stop(self)
>     case OnComplete =>
>       complete = true
>   }
>
>   private def isStreamComplete = {
>     val totalDocumentsProcessedSoFar = successful.size + failed.size
>     complete && (numberOfDocumentsToProcess == 
> totalDocumentsProcessedSoFar)
>   }
>
>   private def processComplete() {
>     if (isStreamComplete) {
>       echoSummary()
>       context.stop(self)
>     }
>   }
>   
>   private def echoProgress() ...
>   private def echoSummary() ...
>   
> }
>
> Problems that we are facing
> ----------------------------
>
> 1. Sequence of messages? 
>    We are not sure about the sequence of events that actor receives. Is it 
> possible that Actor will receive OnComplete first but then some OnNext 
> messages are still in queue?
>
> 2. When to stop Actor?
>    What is the correct way to stop Actor? Right now we are stopping Actor 
> in OnError event and OnNext event (for OnNext - we check if we have 
> processed all documents and also have already received OnComplete event). 
> If we stop Actor in OnError and OnComplete event will it work?
>
> 3. OutOfMemory issues?
>    We ran stream with 2 GB memory but we faced OutOfMemory error before 
> stream completed. Because backpressure is mandatory, we thought that this 
> will not happen. We increased memory to 4 GB and after that program 
> executed without OutOfMemory error. Did we miss anything in our 
> implementation, how can we ensure that we will never get OutOfMemory error 
> irrespective of memory size available to program?
>
> 4. Program dies abruptly
>    Right now our current code stops at some point before it has processed 
> all documents. From our observation we think that it stops after OnComplete 
> message is received, but in OnComplete event we are not stopping the Actor. 
> We are not sure how to debug/fix this behavior.
>
> Appreciate any help/suggestions on this.
>
> -Regards
> Ajay
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to