Re: OOM Java heap space error on saveAsTextFile

2014-08-21 Thread Akhil Das
What operation are you performing before doing the saveAsTextFile? If you
are doing a groupBy/sortBy/mapPartition/reduceByKey operations then you can
specify the number of partitions. We were facing these kind of problems and
specifying the correct partition solved the issue.

Thanks
Best Regards


On Fri, Aug 22, 2014 at 2:06 AM, Daniil Osipov 
wrote:

> Hello,
>
> My job keeps failing on saveAsTextFile stage (frustrating after a 3 hour
> run) with an OOM exception. The log is below. I'm running the job on an
> input of ~8Tb gzipped JSON files, executing on 15 m3.xlarge instances.
> Executor is given 13Gb memory, and I'm setting two custom preferences in
> the job: spark.akka.frameSize: 50 (otherwise it fails due to exceeding the
> limit of 10Mb), spark.storage.memoryFraction: 0.2
>
> Any suggestions?
>
> 14/08/21 19:29:26 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-99-160-181.ec2.internal
> :36962
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMaster: Size of output
> statuses for shuffle 1 is 17541459 bytes
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-144-221-26.ec2.internal
> :49973
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-69-31-121.ec2.internal
> :34569
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-165-70-221.ec2.internal
> :49193
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-218-181-93.ec2.internal
> :57648
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-142-187-230.ec2.internal
> :48115
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-101-178-68.ec2.internal
> :51931
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-99-165-121.ec2.internal
> :38153
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-179-187-182.ec2.internal
> :55645
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-182-231-107.ec2.internal
> :54088
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-165-79-9.ec2.internal
> :40112
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-111-169-138.ec2.internal
> :40394
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-203-161-222.ec2.internal
> :47447
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-153-141-230.ec2.internal
> :53906
> 14/08/21 19:29:32 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [spark-akka.actor.default-dispatcher-20] shutting down ActorSystem
> [spark]
> java.lang.OutOfMemoryError: Java heap space
> at
> com.google.protobuf_spark.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)
> at
> akka.remote.transport.AkkaPduProtobufCodec$.constructPayload(AkkaPduCodec.scala:145)
> at
> akka.remote.transport.AkkaProtocolHandle.write(AkkaProtocolTransport.scala:156)
> at
> akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:569)
> at
> akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
> at akka.actor.FSM$class.processEvent(FSM.scala:595)
> at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443)
> at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589)
> at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/08/21 19:29:32 INFO scheduler.DAGScheduler: Failed

OOM Java heap space error on saveAsTextFile

2014-08-21 Thread Daniil Osipov
Hello,

My job keeps failing on saveAsTextFile stage (frustrating after a 3 hour
run) with an OOM exception. The log is below. I'm running the job on an
input of ~8Tb gzipped JSON files, executing on 15 m3.xlarge instances.
Executor is given 13Gb memory, and I'm setting two custom preferences in
the job: spark.akka.frameSize: 50 (otherwise it fails due to exceeding the
limit of 10Mb), spark.storage.memoryFraction: 0.2

Any suggestions?

14/08/21 19:29:26 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-99-160-181.ec2.internal:36962
14/08/21 19:29:31 INFO spark.MapOutputTrackerMaster: Size of output
statuses for shuffle 1 is 17541459 bytes
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-144-221-26.ec2.internal:49973
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-69-31-121.ec2.internal:34569
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-165-70-221.ec2.internal:49193
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-218-181-93.ec2.internal:57648
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-142-187-230.ec2.internal:48115
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-101-178-68.ec2.internal:51931
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-99-165-121.ec2.internal:38153
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-179-187-182.ec2.internal:55645
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-182-231-107.ec2.internal:54088
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-165-79-9.ec2.internal:40112
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-111-169-138.ec2.internal:40394
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-203-161-222.ec2.internal:47447
14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to spark@ip-10-153-141-230.ec2.internal:53906
14/08/21 19:29:32 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [spark-akka.actor.default-dispatcher-20] shutting down ActorSystem
[spark]
java.lang.OutOfMemoryError: Java heap space
at
com.google.protobuf_spark.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)
at
akka.remote.transport.AkkaPduProtobufCodec$.constructPayload(AkkaPduCodec.scala:145)
at
akka.remote.transport.AkkaProtocolHandle.write(AkkaProtocolTransport.scala:156)
at
akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:569)
at
akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at akka.actor.FSM$class.processEvent(FSM.scala:595)
at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443)
at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589)
at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/08/21 19:29:32 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile
at RecRateApp.scala:88
Exception in thread "main" org.apache.spark.SparkException: Job cancelled
because SparkContext was shut down
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGSchedu