Anyone got any updates on this issue. I am having similar problem where 
akka http client respond with a Success(OK) and then stream gets truncated 
while entity.dataBytes.runFold(ByteString(""))(_ ++ _). The size of the 
data received is ~20Mb.

Thanks Vimal.

On Wednesday, June 29, 2016 at 12:33:47 PM UTC+1, Ubaldo Taladríz wrote:
>
> Hi, I'm not sure how to handle the entity stream truncation exception
> I have a "Downloader" actor using akka http client side, and I tried 
> everything, shutdown the pool, terminate the actor (The parent watches the 
> child and creates a new one ).
>
> This is the internal error
> [ERROR] 2016-06-29 02:09:51.348 [local-akka.actor.default-dispatcher-3] 
> FileSubscriber - Tearing down 
> FileSink(tareas/2/16016/descarga/Resolucion_1467180465357.pdf) due to 
> upstream error
> akka.http.scaladsl.model.EntityStreamException: Entity stream truncation
>
> then i catch this error in entity.dataBytes.runWith(FileIO.toFile(new 
> File(dir, file.fileName.get))).onComplete({
> r.wasSuccesful is false (See the code below)
>
> [ERROR] 2016-06-29 02:09:51.350 [local-akka.actor.default-dispatcher-3] 
> Downloader - !!!!!!Entity stream truncation 
>
> Then i got an error for each child (The default is 4 Downloaders)
> [ERROR] 2016-06-29 02:09:51.351 [local-akka.actor.default-dispatcher-3] 
> ActorSystemImpl - Outgoing request stream error
> akka.http.scaladsl.model.EntityStreamException: Entity stream truncation
>
> Finally I got this error and the pool is shut down:
> [ERROR] 2016-06-29 02:09:51.353 [local-akka.actor.default-dispatcher-5] 
> PoolMasterActor - connection pool for PoolGateway(hcps = 
> HostConnectionPoolSetup(qadnp.portalafp.cl,443,ConnectionPoolSetup(ConnectionPoolSettings(4,5,32,1,30
>  
> seconds,ClientConnectionSettings(Some(User-Agent: akka-http/2.4.7),10 
> seconds,1 
> minute,512,<function0>,List(),ParserSettings(2048,16,64,64,8192,64,8388608,256,1048576,Strict,RFC6265,true,Full,Map(If-Range
>  
> -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, 
> Content-MD5 -> 0, Date -> 0, If-Match -> 0, If-None-Match -> 0, User-Agent 
> -> 
> 32),false,<function1>,<function1>,<function2>))),akka.http.scaladsl.HttpsConnectionContext@5a726436,akka.event.BusLogging@68cb54fd)))
>  
> has shut down unexpectedly
>
>
> This is the code
>
>  case JobStart(t: Task) =>
>       task = t
>       if (!t.task.parameter.get.getOrElse("local", "false").toBoolean) {
>         val uri = t.file.parameters.get("downloadUri")
>         bd.run(updateFile(t.file, "downloading")).onComplete({
>           case Success(r) =>
>             http.get.singleRequest(HttpRequest(uri = uri)).pipeTo(self)
>           case Failure(e) =>
>             throw e
>         })(fixedThreadPoolExecutionContext)
>       }
>       else {
>         endJob(t)
>       }
>
> case HttpResponse(OK, headers, entity, _) =>
>       val content = headers.filter(_.name() == "Content-Disposition")
>       if (content.size > 0) {
>         val dir = new File(task.get.dir, "download")
>         dir.mkdir()
>         val file = task.get.file.copy(fileName = 
> Some(headers.filter(_.name() == 
> "Content-Disposition").head.value().split(";")(1).split("=")(1)))
>         task = Some(task.get.copy(file = file))
>         entity.dataBytes.runWith(FileIO.toFile(new File(dir, 
> file.fileName.get))).onComplete({
>           case Success(r) =>
>             log.debug(s"Downloaded ${task.get.file.id.get}  actor ${
> self.path.name}")
>             if (r.wasSuccessful)
>               bd.run(updateFile(task.get.file, "downloadd")).onComplete({
>                 case Success(r) =>
>                   endJob(task.get)
>                 case Failure(e) => throw e
>               })(fixedThreadPoolExecutionContext)
>             else {
>               log.error(s"!!!!!!${r.getError.getMessage}",r.getError)
>               bd.run(updateFile(task.get.file, "downloading", Some(s"Error 
>  ${r.getError.getMessage}"), Some(Errors.downloadingError))).onComplete({
>                 case Success(t) =>
>                   log.info("!!!!Finalizo Trabajo")
>                   endJob(task.get.copy(retries = task.get.retries + 1, 
> state = "start"))
>                   context.stop(self) //I tried everything, 
> http.shutdownAllConnectionPools()
>                 case Failure(e) => throw e
>               })(fixedThreadPoolExecutionContext)
>
>             }
>           case Failure(e) =>
>             log.error(e.getMessage,e)
>             bd.run(updateFile(trask.get.file, "downloading", Some(s"Error 
>  ${e.getMessage}"), Some(Errors.downloadingError))).onComplete({
>               case Success(t) =>
>                 endJob(task.get.copy(retris = task.get.retries + 1, state 
> = "start"))
>               case Failure(e) => throw e
>             })(fixedThreadPoolExecutionContext)
>         })
>       }
>       else {
>       // The server responds HTTP 200 OK but the content has no file 
> attached
>         entity.toStrict(2.second).map(s=>s.data.utf8String).onComplete({
>           case Success(_) =>
>           case Failure(_) =>
>         }
>         )(fixedThreadPoolExecutionContext)
>         bd.run(updateFile(task.get.file, "downloading", Some(s"Error no 
>  Content-Disposition header"), Some(Errors.downloadingError))).onComplete({
>           case Success(t) =>
>             endJob(task.get.copy(retries = task.get.retries + 1, state = 
> "start"))
>           case Failure(e) => throw e
>         })(fixedThreadPoolExecutionContext)
>       }
>
>
>     case HttpResponse(code, headers, entity, _) =>
>       //Server respond != HTTP 200 OK
>       entity.toStrict(2.second).map(s=>s.data.utf8String).onComplete({
>         case Success(_) =>
>         case Failure(_) =>
>       }
>       )(fixedThreadPoolExecutionContext)
>       bd.run(updateFile(task.get.file, "downloading", Some(s"Error http 
> ${code.intValue()} ${code.defaultMessage()}"), 
> Some(Errores.downloadingError))).onComplete({
>         case Success(t) =>
>           endJob(task.get.copy(retries = task.get.retries + 1, state = 
> "start"))
>         case Failure(e) => throw e
>       })(fixedThreadPoolExecutionContext)
>
>     case akka.actor.Status.Failure(e) =>
>      //URI is wrong
>       log.error("Failure Akka")
>       log.error(e, e.getMessage)
>       bd.run(updateFile(task.get.file, "downloading", Some(e.getMessage), 
> Some(Errors.downloadingError))).onComplete({
>         case Success(t) =>
>           endJob(task.get.copy(retries = task.get.retries + 1, state = 
> "start"))
>         case Failure(e) => throw e
>       })(fixedThreadPoolExecutionContext)
>
>
> -- 
>
> Ubaldo Taladriz Truan
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to