Baohe Zhang created SPARK-34779:
-----------------------------------

             Summary: ExecutoMetricsPoller should keep stage entry in stageTCMP 
until a heartbeat occurs
                 Key: SPARK-34779
                 URL: https://issues.apache.org/jira/browse/SPARK-34779
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 3.1.1, 3.1.0, 3.0.2, 3.0.1, 3.0.0
            Reporter: Baohe Zhang


The current implementation of ExecutoMetricsPoller uses task count in each 
stage to decide whether to keep a stage entry or not. In the case of the 
executor only has 1 core, it may have these issues:
 # Peak metrics missing (due to stage entry being removed within a heartbeat 
interval)
 # Unnecessary and frequent hashmap entry removal and insertion.

Assuming an executor with 1 core has 2 tasks (task1 and task2, both belong to 
stage (0,0)) to execute in a heartbeat interval, the workflow in current 
ExecutorMetricsPoller implementation would be:

1. task1 start -> stage (0, 0) entry created in stageTCMP, task count increment 
to1

2. 1st poll() -> update peak metrics of stage (0, 0)

3. task1 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry 
removed, peak metrics lost.

4. task2 start -> stage (0, 0) entry created in stageTCMP, task count increment 
to1

5. 2nd poll() -> update peak metrics of stage (0, 0)

6. task2 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry 
removed, peak metrics lost

7. heartbeat() ->  empty or inaccurate peak metrics for stage(0,0) reported.

We can fix the issue by keeping entries with task count = 0 in stageTCMP map 
until a heartbeat occurs. At the heartbeat, after reporting the peak metrics 
for each stage, we scan each stage in stageTCMP and remove entries with task 
count = 0.

After the fix, the workflow would be:

1. task1 start -> stage (0, 0) entry created in stageTCMP, task count increment 
to1

2. 1st poll() -> update peak metrics of stage (0, 0)

3. task1 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) 
still remain.

4. task2 start -> task count of stage (0,0) increment to1

5. 2nd poll() -> update peak metrics of stage (0, 0)

6. task2 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) 
still remain.

7. heartbeat() ->  accurate peak metrics for stage (0, 0) reported. Remove 
entry for stage (0,0) in stageTCMP because its task count is 0.

 

How to verify the behavior? 

Submit a job with a custom polling interval (e.g., 2s) and 
spark.executor.cores=1 and check the debug logs of ExecutoMetricsPoller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to