The closed information I can found online related to this error
ishttps://issues.apache.org/jira/browse/SPARK-3633
But it is quite different in our case. In our case, we never saw the "(Too many
open files)" error, the log just simple show the 120 sec time out.
I checked all the GC output from all 42 executors, the max full gc real=11.79
secs is what I can find, way less than 120 seconds time out.
>From 42 executors, there is on executor's stdout/stderr page hangs, I cannot
>see any gc or log information for this executor, but it is shown as "LOADING"
>in the master page, and I think the reason is just the "WorkerUI" cannot bind
>to 8081 somehow during the boot time, and bind to 8082 instead, master UI
>didn't catch that information.
Anyway, my only option now is to increase the timeout of both
"spark.core.connection.ack.wait.timeout" and "spark.akka.timeout" to 600, as
suggested in the jira, and will report back what I find later.
This same daily job runs about 12 hours in the Hive/MR, and can finish about 4
hours in Spark (with 25% allocated cluster resource). On this point, Spark is
faster and great, but IF (big IF) every tasks run smoothly.
In Hive/MR, if the job is setup, it will finish, maybe slow, but smoothly. In
Spark, in this case, it does retry the failed partitions only, but we saw 4 or
5 times retry sometimes, make it in fact much much slower.
Yong
From: [email protected]
To: [email protected]
Subject: Any suggestion about "sendMessageReliably failed because ack was not
received within 120 sec"
Date: Thu, 20 Aug 2015 20:49:52 -0400
Hi, Sparkers:
After first 2 weeks of Spark in our production cluster, with more familiar with
Spark, we are more confident to avoid "Lost Executor" due to memory issue. So
far, most of our jobs won't fail or slow down due to "Lost executor".
But sometimes, I observed that individual tasks failed due to
"sendMessageReliably failed because ack was not received within 120 sec".
Here is the basic information:
Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster
also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We
are conservative for the Spark jobs, with low number of cores + big
parallelism/partitions to control the memory usage in the job, so far we are
happen to avoid "lost executor".
We have one big daily job is running with following configuration:
/opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042
--executor-memory 20G --total-executor-cores 168 --conf
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000
--conf spark.default.parallelism=6000 --conf
spark.shuffle.blockTransferService=nio -i spark.script
168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no
cache needed, so I make the storage memoryFraction very lownio is much robust
than netty in our experience
For this big daily job generating over 20000 of tasks, they all could finish
without this issue, but sometimes, for the same job, tasks keep failing due to
this error and retry.
But even in this case, I saw the task failed due to this error and retry. Retry
maybe part of life for distribute environment, but I want to know what root
cause could behind it and how to avoid it.
Do I increase "spark.core.connection.ack.wait.timeout" to fix this error? When
this happened, I saw there is no executor lost, all are alive.
Below is the message in the log, for example, it complained about timeout to
connect to host-121.
FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17,
reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException:
sendMessageReliably failed because ack was not received within 120 sec at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at
org.apache.spark.scheduler.Task.run(Task.scala:64) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)Caused by: java.io.IOException:
sendMessageReliably failed because ack was not received within 120 sec at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:929)
at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:928)
at scala.Option.foreach(Option.scala:236) at
org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:928)
at
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
at
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367) ...
1 more
Then I check the GC log of executor of host-121, I didn't see any obvious
problem, as following:
12327.896: [GC [PSYoungGen: 4660710K->2330106K(4660736K)]
13954033K->12324769K(18641920K), 0.9448550 secs] [Times: user=16.40 sys=0.03,
real=0.94 secs]
12331.085: [GC [PSYoungGen: 4660730K->2330106K(4660736K)]
14655393K->12988787K(18641920K), 0.8691010 secs] [Times: user=14.76 sys=0.03,
real=0.86 secs]
12334.869: [GC [PSYoungGen: 4660730K->2330080K(4660736K)]
15319411K->13631046K(18641920K), 0.8940860 secs] [Times: user=15.20 sys=0.04,
real=0.89 secs]
12339.050: [GC [PSYoungGen: 4660704K->1990303K(4660736K)]
15961670K->13964847K(18641920K), 0.7512230 secs] [Times: user=13.25 sys=0.02,
real=0.75 secs]
12345.046: [GC [PSYoungGen: 4320927K->994234K(4660736K)]
16295471K->14101940K(18641920K), 0.8231710 secs] [Times: user=14.37 sys=0.04,
real=0.83 secs]
12345.869: [Full GC [PSYoungGen: 994234K->0K(4660736K)] [ParOldGen:
13107705K->11473827K(13981184K)] 14101940K->11473827K(18641920K) [PSPermGen:
56819K->56819K(57344K)], 8.7725670 secs] [Times: user=143.61 sys=0.13,
real=8.77 secs]
12361.444: [GC [PSYoungGen: 2330624K->544K(4660736K)]
13804451K->11474371K(18641920K), 0.0183270 secs] [Times: user=0.26 sys=0.00,
real=0.02 secs]
12368.445: [GC [PSYoungGen: 2331168K->352K(4660736K)]
13804995K->11474320K(18641920K), 0.0330490 secs] [Times: user=0.43 sys=0.02,
real=0.03 secs]
12375.528: [GC [PSYoungGen: 2330976K->352K(4660736K)]
13804944K->11474368K(18641920K), 0.0253410 secs] [Times: user=0.35 sys=0.00,
real=0.03 secs]
12382.833: [GC [PSYoungGen: 2330976K->384K(4660736K)]
13804992K->11474424K(18641920K), 0.0190140 secs] [Times: user=0.28 sys=0.00,
real=0.02 secs]
12390.006: [GC [PSYoungGen: 2331008K->576K(4660736K)]
13805048K->11474632K(18641920K), 0.0166370 secs] [Times: user=0.25 sys=0.01,
real=0.02 secs]
12397.345: [GC [PSYoungGen: 2331200K->416K(4660736K)]
13805256K->11474504K(18641920K), 0.0159600 secs] [Times: user=0.24 sys=0.00,
real=0.01 secs]
12405.098: [GC [PSYoungGen: 2331040K->416K(4660736K)]
13805128K->11474568K(18641920K), 0.0162000 secs] [Times: user=0.23 sys=0.00,
real=0.01 secs]
12412.492: [GC [PSYoungGen: 2331040K->416K(4660736K)]
13805192K->11474608K(18641920K), 0.0281690 secs] [Times: user=0.44 sys=0.00,
real=0.03 secs]
12419.666: [GC [PSYoungGen: 2331040K->416K(4660736K)]
13805232K->11474624K(18641920K), 0.0155110 secs] [Times: user=0.23 sys=0.00,
real=0.02 secs]
12427.648: [GC [PSYoungGen: 2331040K->384K(4660736K)]
13805248K->11474616K(18641920K), 0.0474190 secs] [Times: user=0.51 sys=0.01,
real=0.04 secs]
12435.858: [GC [PSYoungGen: 2331008K->320K(4660736K)]
13805240K->11474576K(18641920K), 0.0271780 secs] [Times: user=0.32 sys=0.00,
real=0.03 secs] So the small GC finished tens of milli-second, and full gc just
be done in several seconds.
Any idea how to avoid this case?
Thanks
Yong