Re: OOM Java heap space error on saveAsTextFile
What operation are you performing before doing the saveAsTextFile? If you are doing a groupBy/sortBy/mapPartition/reduceByKey operations then you can specify the number of partitions. We were facing these kind of problems and specifying the correct partition solved the issue. Thanks Best Regards On Fri, Aug 22, 2014 at 2:06 AM, Daniil Osipov wrote: > Hello, > > My job keeps failing on saveAsTextFile stage (frustrating after a 3 hour > run) with an OOM exception. The log is below. I'm running the job on an > input of ~8Tb gzipped JSON files, executing on 15 m3.xlarge instances. > Executor is given 13Gb memory, and I'm setting two custom preferences in > the job: spark.akka.frameSize: 50 (otherwise it fails due to exceeding the > limit of 10Mb), spark.storage.memoryFraction: 0.2 > > Any suggestions? > > 14/08/21 19:29:26 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-99-160-181.ec2.internal > :36962 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMaster: Size of output > statuses for shuffle 1 is 17541459 bytes > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-144-221-26.ec2.internal > :49973 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-69-31-121.ec2.internal > :34569 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-165-70-221.ec2.internal > :49193 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-218-181-93.ec2.internal > :57648 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-142-187-230.ec2.internal > :48115 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-101-178-68.ec2.internal > :51931 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-99-165-121.ec2.internal > :38153 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-179-187-182.ec2.internal > :55645 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-182-231-107.ec2.internal > :54088 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-165-79-9.ec2.internal > :40112 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-111-169-138.ec2.internal > :40394 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-203-161-222.ec2.internal > :47447 > 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send > map output locations for shuffle 1 to spark@ip-10-153-141-230.ec2.internal > :53906 > 14/08/21 19:29:32 ERROR actor.ActorSystemImpl: Uncaught fatal error from > thread [spark-akka.actor.default-dispatcher-20] shutting down ActorSystem > [spark] > java.lang.OutOfMemoryError: Java heap space > at > com.google.protobuf_spark.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62) > at > akka.remote.transport.AkkaPduProtobufCodec$.constructPayload(AkkaPduCodec.scala:145) > at > akka.remote.transport.AkkaProtocolHandle.write(AkkaProtocolTransport.scala:156) > at > akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:569) > at > akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) > at akka.actor.FSM$class.processEvent(FSM.scala:595) > at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443) > at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589) > at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 14/08/21 19:29:32 INFO scheduler.DAGScheduler: Failed
OOM Java heap space error on saveAsTextFile
Hello, My job keeps failing on saveAsTextFile stage (frustrating after a 3 hour run) with an OOM exception. The log is below. I'm running the job on an input of ~8Tb gzipped JSON files, executing on 15 m3.xlarge instances. Executor is given 13Gb memory, and I'm setting two custom preferences in the job: spark.akka.frameSize: 50 (otherwise it fails due to exceeding the limit of 10Mb), spark.storage.memoryFraction: 0.2 Any suggestions? 14/08/21 19:29:26 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-160-181.ec2.internal:36962 14/08/21 19:29:31 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 17541459 bytes 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-144-221-26.ec2.internal:49973 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-69-31-121.ec2.internal:34569 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-70-221.ec2.internal:49193 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-218-181-93.ec2.internal:57648 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-142-187-230.ec2.internal:48115 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-101-178-68.ec2.internal:51931 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-165-121.ec2.internal:38153 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-179-187-182.ec2.internal:55645 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-182-231-107.ec2.internal:54088 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-79-9.ec2.internal:40112 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-111-169-138.ec2.internal:40394 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-203-161-222.ec2.internal:47447 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-153-141-230.ec2.internal:53906 14/08/21 19:29:32 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-20] shutting down ActorSystem [spark] java.lang.OutOfMemoryError: Java heap space at com.google.protobuf_spark.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62) at akka.remote.transport.AkkaPduProtobufCodec$.constructPayload(AkkaPduCodec.scala:145) at akka.remote.transport.AkkaProtocolHandle.write(AkkaProtocolTransport.scala:156) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:569) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at akka.actor.FSM$class.processEvent(FSM.scala:595) at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/08/21 19:29:32 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at RecRateApp.scala:88 Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGSchedu