Hello,
I am trying to run a spark job but if the data set is too big I got a
"org.apache.spark.SparkException: Error communicating with MapOutputTracker"
When a I run the same algorithm with a smaller data set everything goes
smoothly.
Let me explain the scenario.
I Download the data from amazon s3, (sc.textFile("amazonpath"))
then I use a new Coalesced(data, numberOfMachines) object.
The reason that I do it, is to evenly distribute the data on the HDFS file
system and not to have problems with to many open files.
Usually I run this in 20 m2.2xlarge machines (AMAZON)
I use the ec2 script
So, the data has about 2.2 BILLIONS lines, each line has around 600 bytes.
Here is my line of code
dataAsRDD.flatMap(parseRankEvent(_)).distinct
the problems happens right after the first stage (distinct),
even if I try to only count after the distinct stage I always get the same
error.
O spark.MapOutputTrackerActor: Asked to send map output locations for
shuffle 2 to ip-10-157-5-170.ec2.internal:52255
I already tried to change spark.akka.framSize to a bigger one (40, 80,
6000), no luck
Just to remind that if I run the same code with a smaller data set (for
example 600 milions lines) everything goes really well amd I can run it
with 20 machines m2.xlarge in 5 minutes. (the map, distinct and count)
I spent already around 40 hours trying to solve this and all my ideas to
solve it are over :)
Thank you for your attention , and any help will be great.
I am sure that if I make it work , our company will be a great case study
for spark :), we already are reference in mazon
http://aws.amazon.com/solutions/case-studies/chaordic-systems/
and Cassandra, now I am trying to change from AMAZON EMR to spark :)
thanks
13/11/03 01:17:24 INFO cluster.ClusterTaskSetManager: Lost TID 6569 (task
0.0:12)
13/11/03 01:17:24 INFO cluster.ClusterTaskSetManager: Loss was due to
org.apache.spark.SparkException
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
13/11/03 01:17:24 INFO cluster.ClusterTaskSetManager: Starting task 0.0:12
as TID 6710 on executor 48: ip-10-118-146-196.ec2.internal (PROCESS_LOCAL)
13/11/03 01:17:24 INFO cluster.ClusterTaskSetManager: Serialized task
0.0:12 as 2577 bytes in 0 m
r: Serialized task 2.0:46 as 2782 bytes in 0 ms
13/11/03 03:12:43 INFO spark.MapOutputTrackerActor: Asked to send map
output locations for shuffle 2 to ip-10-157-5-170.ec2.internal:52255
13/11/03 03:12:43 INFO cluster.ClusterTaskSetManager: Lost TID 6592 (task
2.0:19)
13/11/03 03:12:43 INFO cluster.ClusterTaskSetManager: Loss was due to
org.apache.spark.SparkException: Error communicating with MapOutputTracker
[duplicate 32]
13/11/03 03:12:43 INFO cluster.ClusterTaskSetManager: Starting task 2.0:19
as TID 6795 on executor 4: ip-10-157-5-170.ec2.internal (PROCESS_LOCAL)
13/11/03 03:12:43 INFO cluster.ClusterTaskSetManager: Serialized task
2.0:19 as 2782 bytes in 0 ms
13/11/03 03:12:52 INFO cluster.ClusterTaskSetManager: Lost TID 6730 (task
2.0:109)
13/11/03 03:12:52 INFO cluster.ClusterTaskSetManager: Loss was due to
org.apache.spark.SparkException
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:89)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:88)
at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:37)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:89)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:89)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)