Hi,

I'm running Spark 3.0.1 on AWS. Dynamic allocation is disabled. I'm caching
a large dataset 100% in memory. Before caching it I coalesce the dataset to
1792 partitions. There are 112 executors and 896 cores on the cluster.

The next stage is reading as input those 1792 partitions. The query plan is:

1. InMemoryScan (MapPartitionsRDD1 -> MapPartitionsRDD2)
2. WholeStageCodegen (MapPartitionsRDD)
3. ShuffleQueryStage (MapPartitionsRDD)

I've set *spark.locality.wait *to 120 seconds because I want to make sure
that no matter what all tasks will run PROCESS_LOCAL to utilize local
executor cache. I expect each executor to run a total of *16 tasks exactly*.


The *problem *that I see that this stage runs 1770 tasks and then it shows
1770/1792 tasks completed and no other tasks running. It then waits exactly
120 seconds and schedules 22 remaining tasks using RACK_LOCAL. Those 22
tasks run very slow compared to the rest of the tasks. In the end it shows
that some executors ran 17 tasks and some ran 15.

The stage summary UI shows:


   - *Total Time Across All Tasks: *1.7 h
   - *Locality Level Summary: *Process local: 1770; Rack local: 22
   - *Input Size / Records: *1418.9 GiB / 9928798
   - *Shuffle Write Size / Records: *2.5 GiB / 35494236

The slowest PROCESS_LOCAL task:

IndexTask IDAttemptStatusLocality levelExecutor IDHostLogsLaunch Time
DurationGC TimeInput Size / RecordsWrite TimeShuffle Write Size / Records
Errors
477 4590 0 SUCCESS PROCESS_LOCAL 9 ip-xxxx.ec2.internal
stdout
<http://ip-10-99-88-233.ec2.internal:8042/node/containerlogs/container_1623664711892_2541_01_000022/ec2-user/stdout?start=-4096>
stderr
<http://ip-10-99-88-233.ec2.internal:8042/node/containerlogs/container_1623664711892_2541_01_000022/ec2-user/stderr?start=-4096>
2021-06-16 12:59:25 6 s 2 s 1.2 GiB / 311 7.0 ms 1.8 MiB / 25474
and slowest RACK_LOCAL task:

IndexTask IDAttemptStatusLocality levelExecutor IDHostLogsLaunch Time
DurationGC TimeInput Size / RecordsWrite TimeShuffle Write Size / Records
Errors
792 5876 0 SUCCESS RACK_LOCAL 28 ip-xxxx.ec2.internal
stdout
<http://ip-10-99-89-160.ec2.internal:8042/node/containerlogs/container_1623664711892_2541_01_000061/ec2-user/stdout?start=-4096>
stderr
<http://ip-10-99-89-160.ec2.internal:8042/node/containerlogs/container_1623664711892_2541_01_000061/ec2-user/stderr?start=-4096>
2021-06-16 13:01:31 1.1 min 0.2 s 384.6 MiB / 642897 3.0 ms 1.6 MiB / 21786

I don't understand what is causing this. Why is Spark refusing to run the
tasks on PROCESS_LOCAL? What is happening with those 22 tasks and why do
they become RACK_LOCAL? One interesting thing to note is that these 22
tasks are showing a much higher input record count. I understand that these
tasks could be skewed but why do they not begin running on PROCESS_LOCAL
and wait for the locality timer to be scheduled?

Any insight greatly appreciated,
Thanks

Reply via email to