The closed information I can found online related to this error ishttps://issues.apache.org/jira/browse/SPARK-3633 But it is quite different in our case. In our case, we never saw the "(Too many open files)" error, the log just simple show the 120 sec time out. I checked all the GC output from all 42 executors, the max full gc real=11.79 secs is what I can find, way less than 120 seconds time out. >From 42 executors, there is on executor's stdout/stderr page hangs, I cannot >see any gc or log information for this executor, but it is shown as "LOADING" >in the master page, and I think the reason is just the "WorkerUI" cannot bind >to 8081 somehow during the boot time, and bind to 8082 instead, master UI >didn't catch that information. Anyway, my only option now is to increase the timeout of both "spark.core.connection.ack.wait.timeout" and "spark.akka.timeout" to 600, as suggested in the jira, and will report back what I find later. This same daily job runs about 12 hours in the Hive/MR, and can finish about 4 hours in Spark (with 25% allocated cluster resource). On this point, Spark is faster and great, but IF (big IF) every tasks run smoothly. In Hive/MR, if the job is setup, it will finish, maybe slow, but smoothly. In Spark, in this case, it does retry the failed partitions only, but we saw 4 or 5 times retry sometimes, make it in fact much much slower. Yong From: java8...@hotmail.com To: user@spark.apache.org Subject: Any suggestion about "sendMessageReliably failed because ack was not received within 120 sec" Date: Thu, 20 Aug 2015 20:49:52 -0400
Hi, Sparkers: After first 2 weeks of Spark in our production cluster, with more familiar with Spark, we are more confident to avoid "Lost Executor" due to memory issue. So far, most of our jobs won't fail or slow down due to "Lost executor". But sometimes, I observed that individual tasks failed due to "sendMessageReliably failed because ack was not received within 120 sec". Here is the basic information: Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We are conservative for the Spark jobs, with low number of cores + big parallelism/partitions to control the memory usage in the job, so far we are happen to avoid "lost executor". We have one big daily job is running with following configuration: /opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042 --executor-memory 20G --total-executor-cores 168 --conf spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000 --conf spark.default.parallelism=6000 --conf spark.shuffle.blockTransferService=nio -i spark.script 168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no cache needed, so I make the storage memoryFraction very lownio is much robust than netty in our experience For this big daily job generating over 20000 of tasks, they all could finish without this issue, but sometimes, for the same job, tasks keep failing due to this error and retry. But even in this case, I saw the task failed due to this error and retry. Retry maybe part of life for distribute environment, but I want to know what root cause could behind it and how to avoid it. Do I increase "spark.core.connection.ack.wait.timeout" to fix this error? When this happened, I saw there is no executor lost, all are alive. Below is the message in the log, for example, it complained about timeout to connect to host-121. FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17, reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed because ack was not received within 120 sec at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)Caused by: java.io.IOException: sendMessageReliably failed because ack was not received within 120 sec at org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:929) at org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:928) at scala.Option.foreach(Option.scala:236) at org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:928) at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581) at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656) at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367) ... 1 more Then I check the GC log of executor of host-121, I didn't see any obvious problem, as following: 12327.896: [GC [PSYoungGen: 4660710K->2330106K(4660736K)] 13954033K->12324769K(18641920K), 0.9448550 secs] [Times: user=16.40 sys=0.03, real=0.94 secs] 12331.085: [GC [PSYoungGen: 4660730K->2330106K(4660736K)] 14655393K->12988787K(18641920K), 0.8691010 secs] [Times: user=14.76 sys=0.03, real=0.86 secs] 12334.869: [GC [PSYoungGen: 4660730K->2330080K(4660736K)] 15319411K->13631046K(18641920K), 0.8940860 secs] [Times: user=15.20 sys=0.04, real=0.89 secs] 12339.050: [GC [PSYoungGen: 4660704K->1990303K(4660736K)] 15961670K->13964847K(18641920K), 0.7512230 secs] [Times: user=13.25 sys=0.02, real=0.75 secs] 12345.046: [GC [PSYoungGen: 4320927K->994234K(4660736K)] 16295471K->14101940K(18641920K), 0.8231710 secs] [Times: user=14.37 sys=0.04, real=0.83 secs] 12345.869: [Full GC [PSYoungGen: 994234K->0K(4660736K)] [ParOldGen: 13107705K->11473827K(13981184K)] 14101940K->11473827K(18641920K) [PSPermGen: 56819K->56819K(57344K)], 8.7725670 secs] [Times: user=143.61 sys=0.13, real=8.77 secs] 12361.444: [GC [PSYoungGen: 2330624K->544K(4660736K)] 13804451K->11474371K(18641920K), 0.0183270 secs] [Times: user=0.26 sys=0.00, real=0.02 secs] 12368.445: [GC [PSYoungGen: 2331168K->352K(4660736K)] 13804995K->11474320K(18641920K), 0.0330490 secs] [Times: user=0.43 sys=0.02, real=0.03 secs] 12375.528: [GC [PSYoungGen: 2330976K->352K(4660736K)] 13804944K->11474368K(18641920K), 0.0253410 secs] [Times: user=0.35 sys=0.00, real=0.03 secs] 12382.833: [GC [PSYoungGen: 2330976K->384K(4660736K)] 13804992K->11474424K(18641920K), 0.0190140 secs] [Times: user=0.28 sys=0.00, real=0.02 secs] 12390.006: [GC [PSYoungGen: 2331008K->576K(4660736K)] 13805048K->11474632K(18641920K), 0.0166370 secs] [Times: user=0.25 sys=0.01, real=0.02 secs] 12397.345: [GC [PSYoungGen: 2331200K->416K(4660736K)] 13805256K->11474504K(18641920K), 0.0159600 secs] [Times: user=0.24 sys=0.00, real=0.01 secs] 12405.098: [GC [PSYoungGen: 2331040K->416K(4660736K)] 13805128K->11474568K(18641920K), 0.0162000 secs] [Times: user=0.23 sys=0.00, real=0.01 secs] 12412.492: [GC [PSYoungGen: 2331040K->416K(4660736K)] 13805192K->11474608K(18641920K), 0.0281690 secs] [Times: user=0.44 sys=0.00, real=0.03 secs] 12419.666: [GC [PSYoungGen: 2331040K->416K(4660736K)] 13805232K->11474624K(18641920K), 0.0155110 secs] [Times: user=0.23 sys=0.00, real=0.02 secs] 12427.648: [GC [PSYoungGen: 2331040K->384K(4660736K)] 13805248K->11474616K(18641920K), 0.0474190 secs] [Times: user=0.51 sys=0.01, real=0.04 secs] 12435.858: [GC [PSYoungGen: 2331008K->320K(4660736K)] 13805240K->11474576K(18641920K), 0.0271780 secs] [Times: user=0.32 sys=0.00, real=0.03 secs] So the small GC finished tens of milli-second, and full gc just be done in several seconds. Any idea how to avoid this case? Thanks Yong