Hi there, This issue has been mentioned in:
http://apache-spark-user-list.1001560.n3.nabble.com/Java-IO-Stream-Corrupted-Invalid-Type-AC-td6925.html However I'm starting a new thread since the issue is distinct from the above topic's designated subject. I'm test-running canonical conflation on a ~100 MB graph (with hopes to scale to 10 GB or more). I'm deploying on 5 r3.xlarge machines on AWS EMR and using default configurations, with the exception of setting spark.serializer as org.apache.spark.serializer.KryoSerializer. The full stack-trace from canonical conflation is pasted below; it evidently fails at: "Failed to run reduce at VertexRDD.scala:100". (The same app ran just fine on very small input locally.) Has there been any progress in identifying the underlying issues? Thanks! 14/07/24 16:29:37 INFO mapred.FileInputFormat: Total input paths to process : 1 ********* About to run connected components ********* 14/07/24 16:29:37 INFO spark.SparkContext: Starting job: reduce at VertexRDD.scala:100 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 5 (mapPartitions at VertexRDD.scala:423) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 18 (mapPartitions at VertexRDD.scala:318) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 22 (mapPartitions at VertexRDD.scala:318) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 26 (mapPartitions at GraphImpl.scala:184) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Got job 0 (reduce at VertexRDD.scala:100) with 1 output partitions (allowLocal=false) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce at VertexRDD.scala:100) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1, Stage 2) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1, Stage 2) 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Submitting Stage 1 (VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:423), which has no missing parents 14/07/24 16:29:37 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:423) 14/07/24 16:29:37 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-5-147-209.ec2.internal:60530/user/Executor#-2098248966] with ID 0 14/07/24 16:29:39 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: ip-10-5-147-209.ec2.internal (PROCESS_LOCAL) 14/07/24 16:29:39 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2300 bytes in 3 ms 14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-167-166-70.ec2.internal:53470/user/Executor#-1954387250] with ID 3 14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-169-50-78.ec2.internal:37584/user/Executor#-247338355] with ID 2 14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-95-161-133.ec2.internal:55718/user/Executor#-2120787048] with ID 1 14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager ip-10-167-166-70.ec2.internal:52351 with 294.9 MB RAM 14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager ip-10-5-147-209.ec2.internal:34712 with 294.9 MB RAM 14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager ip-10-169-50-78.ec2.internal:35244 with 294.9 MB RAM 14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager ip-10-95-161-133.ec2.internal:44976 with 294.9 MB RAM 14/07/24 16:30:09 INFO cluster.SparkDeploySchedulerBackend: Executor 0 disconnected, so removing it 14/07/24 16:30:09 INFO client.AppClient$ClientActor: Executor updated: app-20140724162937-0004/0 is now EXITED (Command exited with code 52) 14/07/24 16:30:09 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140724162937-0004/0 removed: Command exited with code 52 14/07/24 16:30:09 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on ip-10-5-147-209.ec2.internal: remote Akka client disassociated 14/07/24 16:30:09 INFO scheduler.TaskSetManager: Re-queueing tasks for 0 from TaskSet 1.0 14/07/24 16:30:10 WARN scheduler.TaskSetManager: Lost TID 0 (task 1.0:0) 14/07/24 16:30:10 INFO client.AppClient$ClientActor: Executor added: app-20140724162937-0004/4 on worker-20140724151003-ip-10-5-147-209.ec2.internal-55958 (ip-10-5-147-209.ec2.internal:55958) with 4 cores 14/07/24 16:30:10 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 1 on executor 1: ip-10-95-161-133.ec2.internal (PROCESS_LOCAL) 14/07/24 16:30:10 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140724162937-0004/4 on hostPort ip-10-5-147-209.ec2.internal:55958 with 4 cores, 512.0 MB RAM 14/07/24 16:30:10 INFO client.AppClient$ClientActor: Executor updated: app-20140724162937-0004/4 is now RUNNING 14/07/24 16:30:10 INFO scheduler.DAGScheduler: Executor lost: 0 (epoch 0) 14/07/24 16:30:10 INFO storage.BlockManagerMasterActor: Trying to remove executor 0 from BlockManagerMaster. 14/07/24 16:30:10 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2300 bytes in 5 ms 14/07/24 16:30:10 INFO storage.BlockManagerMaster: Removed 0 successfully in removeExecutor 14/07/24 16:30:11 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-5-147-209.ec2.internal:54592/user/Executor#1645884416] with ID 4 14/07/24 16:30:12 INFO storage.BlockManagerInfo: Registering block manager ip-10-5-147-209.ec2.internal:60694 with 294.9 MB RAM 14/07/24 16:30:41 INFO cluster.SparkDeploySchedulerBackend: Executor 1 disconnected, so removing it 14/07/24 16:30:41 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on ip-10-95-161-133.ec2.internal: remote Akka client disassociated 14/07/24 16:30:41 INFO scheduler.TaskSetManager: Re-queueing tasks for 1 from TaskSet 1.0 14/07/24 16:30:41 WARN scheduler.TaskSetManager: Lost TID 1 (task 1.0:0) 14/07/24 16:30:41 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 2: ip-10-169-50-78.ec2.internal (PROCESS_LOCAL) 14/07/24 16:30:41 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2300 bytes in 0 ms 14/07/24 16:30:41 INFO scheduler.DAGScheduler: Executor lost: 1 (epoch 4) 14/07/24 16:30:41 INFO storage.BlockManagerMasterActor: Trying to remove executor 1 from BlockManagerMaster. 14/07/24 16:30:41 INFO storage.BlockManagerMaster: Removed 1 successfully in removeExecutor 14/07/24 16:30:41 INFO client.AppClient$ClientActor: Executor updated: app-20140724162937-0004/1 is now EXITED (Command exited with code 52) 14/07/24 16:30:41 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140724162937-0004/1 removed: Command exited with code 52 14/07/24 16:30:41 INFO client.AppClient$ClientActor: Executor added: app-20140724162937-0004/5 on worker-20140724151300-ip-10-95-161-133.ec2.internal-46697 (ip-10-95-161-133.ec2.internal:46697) with 4 cores 14/07/24 16:30:41 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140724162937-0004/5 on hostPort ip-10-95-161-133.ec2.internal:46697 with 4 cores, 512.0 MB RAM 14/07/24 16:30:41 INFO client.AppClient$ClientActor: Executor updated: app-20140724162937-0004/5 is now RUNNING 14/07/24 16:30:43 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-95-161-133.ec2.internal:53295/user/Executor#-485659224] with ID 5 14/07/24 16:30:43 INFO storage.BlockManagerInfo: Registering block manager ip-10-95-161-133.ec2.internal:54103 with 294.9 MB RAM 14/07/24 16:31:09 INFO cluster.SparkDeploySchedulerBackend: Executor 2 disconnected, so removing it 14/07/24 16:31:09 ERROR scheduler.TaskSchedulerImpl: Lost executor 2 on ip-10-169-50-78.ec2.internal: remote Akka client disassociated 14/07/24 16:31:09 INFO scheduler.TaskSetManager: Re-queueing tasks for 2 from TaskSet 1.0 14/07/24 16:31:09 WARN scheduler.TaskSetManager: Lost TID 2 (task 1.0:0) 14/07/24 16:31:09 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 3 on executor 5: ip-10-95-161-133.ec2.internal (PROCESS_LOCAL) 14/07/24 16:31:09 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2300 bytes in 0 ms 14/07/24 16:31:09 INFO scheduler.DAGScheduler: Executor lost: 2 (epoch 8) 14/07/24 16:31:09 INFO storage.BlockManagerMasterActor: Trying to remove executor 2 from BlockManagerMaster. 14/07/24 16:31:09 INFO storage.BlockManagerMaster: Removed 2 successfully in removeExecutor 14/07/24 16:31:09 INFO client.AppClient$ClientActor: Executor updated: app-20140724162937-0004/2 is now EXITED (Command exited with code 52) 14/07/24 16:31:09 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140724162937-0004/2 removed: Command exited with code 52 14/07/24 16:31:09 INFO client.AppClient$ClientActor: Executor added: app-20140724162937-0004/6 on worker-20140724151212-ip-10-169-50-78.ec2.internal-49110 (ip-10-169-50-78.ec2.internal:49110) with 4 cores 14/07/24 16:31:09 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140724162937-0004/6 on hostPort ip-10-169-50-78.ec2.internal:49110 with 4 cores, 512.0 MB RAM 14/07/24 16:31:09 INFO client.AppClient$ClientActor: Executor updated: app-20140724162937-0004/6 is now RUNNING 14/07/24 16:31:11 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-10-169-50-78.ec2.internal:46911/user/Executor#2105325300] with ID 6 14/07/24 16:31:11 INFO storage.BlockManagerInfo: Registering block manager ip-10-169-50-78.ec2.internal:56302 with 294.9 MB RAM 14/07/24 16:31:41 INFO cluster.SparkDeploySchedulerBackend: Executor 5 disconnected, so removing it 14/07/24 16:31:41 ERROR scheduler.TaskSchedulerImpl: Lost executor 5 on ip-10-95-161-133.ec2.internal: remote Akka client disassociated 14/07/24 16:31:41 INFO scheduler.TaskSetManager: Re-queueing tasks for 5 from TaskSet 1.0 14/07/24 16:31:41 WARN scheduler.TaskSetManager: Lost TID 3 (task 1.0:0) 14/07/24 16:31:41 ERROR scheduler.TaskSetManager: Task 1.0:0 failed 4 times; aborting job 14/07/24 16:31:41 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/07/24 16:31:41 INFO client.AppClient$ClientActor: Executor updated: app-20140724162937-0004/5 is now EXITED (Command exited with code 52) 14/07/24 16:31:41 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140724162937-0004/5 removed: Command exited with code 52 14/07/24 16:31:41 INFO client.AppClient$ClientActor: Executor added: app-20140724162937-0004/7 on worker-20140724151300-ip-10-95-161-133.ec2.internal-46697 (ip-10-95-161-133.ec2.internal:46697) with 4 cores 14/07/24 16:31:41 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140724162937-0004/7 on hostPort ip-10-95-161-133.ec2.internal:46697 with 4 cores, 512.0 MB RAM 14/07/24 16:31:41 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 14/07/24 16:31:41 INFO scheduler.DAGScheduler: Failed to run reduce at VertexRDD.scala:100 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: TID 3 on host ip-10-95-161-133.ec2.internal failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/07/24 16:31:41 INFO client.AppClient$ClientActor: Executor updated: app-20140724162937-0004/7 is now RUNNING 14/07/24 16:31:41 INFO scheduler.DAGScheduler: Executor lost: 5 (epoch 12) 14/07/24 16:31:41 INFO storage.BlockManagerMasterActor: Trying to remove executor 5 from BlockManagerMaster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-canonical-conflation-issues-tp10602.html Sent from the Apache Spark User List mailing list archive at Nabble.com.