Looks like the write to Aerospike is taking too long. Could you try writing the rdd directly to filesystem, skipping the Aerospike write.
foreachPartition at WriteToAerospike.java:47, took 338.345827 s - Thanks, via mobile, excuse brevity. On Jul 12, 2016 8:08 PM, "Saurav Sinha" <sauravsinh...@gmail.com> wrote: > Hi, > > I am getting into an issue where job is running in multiple partition > around 21000 parts. > > > Setting > > Driver = 5G > Executor memory = 10G > Total executor core =32 > It us falling when I am trying to write to aerospace earlier it is working > fine. I am suspecting number of partition as reason. > > Kindly help to solve this. > > It is giving error : > > > 16/07/12 14:53:54 INFO MapOutputTrackerMaster: Size of output statuses for > shuffle 37 is 9436142 bytes > 16/07/12 14:58:46 WARN HeartbeatReceiver: Removing executor 0 with no > recent heartbeats: 150060 ms exceeds timeout 120000 ms > 16/07/12 14:58:48 WARN DAGScheduler: Creating new stage failed due to > exception - job: 14 > java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371) > at > org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) > at > org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:372) > at > org.apache.spark.scheduler.DAGScheduler.newOrUsedShuffleStage(DAGScheduler.scala:292) > at > org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:343) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:221) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:324) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:321) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321) > at > org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:333) > at > org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:234) > at > org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:270) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:768) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1426) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > 16/07/12 14:58:48 ERROR TaskSchedulerImpl: Lost executor 0 : Executor > heartbeat timed out after 150060 ms > 16/07/12 14:58:48 INFO DAGScheduler: Job 14 failed: foreachPartition at > WriteToAerospike.java:47, took 338.345827 s > 16/07/12 14:58:48 ERROR MinervaLauncher: Job failed due to exception > =java.lang.IllegalStateException: unread block data > java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371) > at > org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) > at > org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:372) > at > org.apache.spark.scheduler.DAGScheduler.newOrUsedShuffleStage(DAGScheduler.scala:292) > at > org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:343) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:221) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:324) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:321) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321) > at > org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:333) > at > org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:234) > at > org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:270) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:768) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1426) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > -- > Thanks and Regards, > > Saurav Sinha > > Contact: 9742879062 >