Hi, All:
We are testing the EMR and compare with our on-premise HDP solution. We use one 
application as the test:
EMR (5.21.1) with Hadoop 2.8.5 + Spark 2.4.3 vs HDP (2.6.3) with Hadoop 2.7.3 + 
Spark 2.2.0
The application is very simple, just read Parquet raw file, then do a 
DS.repartition(id_col).flatMap().write.partitionBy(col).save() operation.

For the testing data on HDP with 6 slave nodes (32G each), the whole 
application can finish around 3 hours. We are fine with it.
This application will run a Spark application with 2 stages. The 2nd stage will 
run with 200 tasks as default.
On EMR, we observed that 2 of 200 tasks is hanging for more than 10 hours, 
while the rests are done, and we have to give up.

The first test is to read the raw parquet file from S3 and use AWS S3 as the 
output directly. So I think it could be some issue with S3 output committer. So 
we change the test to read parquet file from S3 and use EMR HDFS as the output 
location.
To my surprise, we observed the same behavior using HDFS, 2 of 200 tasks 
hanging forever, and they are on different executors. These 2 executors are 
normal to process other tasks but just hang for these 2 tasks, while all the 
rest finished.

This looks like data skew, but we know it is not. As the same application and 
the same data work fine on HDP, and we saw well-balanced data across all 200 
tasks.

Now I checked more careful for the executors log on EMR for using HDFS test 
case, and I know the S3 is not an issue here, as all the parquet raw data being 
read in the first stage of the job WITHOUT any delay.

Sample log from the finished executor on EMR:
19/08/29 20:18:49 INFO Executor: Finished task 157.0 in stage 2.0 (TID 170). 
3854 bytes result sent to driver
19/08/29 20:18:49 INFO CoarseGrainedExecutorBackend: Got assigned task 179
19/08/29 20:18:49 INFO Executor: Running task 166.0 in stage 2.0 (TID 179)
19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks 
including 1 local blocks and 11 remote blocks
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-147.ec2.internal/10.51.51.147:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-141.ec2.internal/10.51.51.141:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-155.ec2.internal/10.51.51.155:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-142.ec2.internal/10.51.51.142:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-140.ec2.internal/10.51.51.140:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-157.ec2.internal/10.51.51.157:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Started 11 remote fetches 
in 61 ms
19/08/29 20:28:55 INFO FileOutputCommitter: File Output Committer Algorithm 
version is 1
.................

The last log from the hanging executor on EMR:
19/08/29 19:40:40 INFO Executor: Finished task 78.0 in stage 2.0 (TID 91). 3854 
bytes result sent to driver
19/08/29 19:40:40 INFO CoarseGrainedExecutorBackend: Got assigned task 101
19/08/29 19:40:40 INFO Executor: Running task 88.0 in stage 2.0 (TID 101)
19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks 
including 1 local blocks and 11 remote blocks
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-147.ec2.internal/10.51.51.147:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-157.ec2.internal/10.51.51.157:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 after 1 ms (0 ms spent in 
bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-142.ec2.internal/10.51.51.142:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 after 1 ms (0 ms spent in 
bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-141.ec2.internal/10.51.51.141:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-155.ec2.internal/10.51.51.155:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to 
ip-10-51-51-140.ec2.internal/10.51.51.140:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection 
to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 after 0 ms (0 ms spent in 
bootstraps)
19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Started 11 remote fetches 
in 73 ms

It shows that on the hanging executor, it started fetching data for task "101", 
but never reached "FileOutputCommitter", for this particular task "101". There 
were other tasks "91" finished without any issue on this executor before.
I checked the HDFS output location:
[hadoop@ip-10-51-51-151 ~]$ hadoop fs -ls -R 
/user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 
/user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01
-rw-r--r--   2 hadoop hadoop  170976376 2019-08-29 19:51 
/user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 
/user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02
-rw-r--r--   2 hadoop hadoop  102985213 2019-08-29 19:51 
/user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 
/user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03
-rw-r--r--   2 hadoop hadoop   58306503 2019-08-29 19:51 
/user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 
/user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN
-rw-r--r--   2 hadoop hadoop  258330267 2019-08-29 19:51 
/user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet

In fact, for me, all the intermedia data for this task "101" SHOULD ALREADY BE 
DONE on HDFS at "19:51". The output parquet files size is close to other tasks' 
output which already was finished.

So my questions are:

1) What COULD stop these 2 executors reaching "FileOutputCommitter" in Spark 
2.4.3 in this case? I really don't believe at this time they were still 
fetching data from remote.
2) Of course, this Spark 2.4.3 is running on EMR, and AWS gave us the following 
configurations may related to the above issue as below:

spark.hadoop.yarn.timeline-service.enabled false
spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS $(hostname -f)
spark.files.fetchFailure.unRegisterOutputOnHost true
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem
 2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem
 true
spark.sql.parquet.output.committer.class 
com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
spark.sql.parquet.fs.optimized.committer.optimization-enabled true
spark.sql.emr.internal.extensions 
com.amazonaws.emr.spark.EmrSparkSessionExtensions

Can anyone give me some idea what could cause this issue?

Thanks

Yong

Reply via email to