[jira] [Updated] (SPARK-48394) Cleanup mapIdToMapIndex on mapoutput unregister

2024-05-28 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-48394:
-
Description: 
There is only one valid mapstatus for the same {{mapIndex}} at the same time in 
Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid chaos.

 

The issue leads to shuffle fetch failure and the job failure in the end. It 
happens this way:
 # Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on executor 
e1
 # Executor Y starts deommission
 # Executor Y reports false-positve lost to driver during its decommission
 # Stage B reuse the shuffle dependency with Stage A, and computes the 
partition P0 again by task t2 on executor e2
 # When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the 
same paritition in {{mapIdToMapIndex}} but only one item 
(mapStatuses(P0)=MapStatus(t2, e2)) in {{{}mapStatuses{}}}.
 # Executor Y starts to migrate task t1's mapstatus (to executor e3 for 
example) and call {{updateMapOutput}} on driver. Regarding to 5), we'd use 
mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's 
mapstatus.

// updateMapOutputval mapIndex = mapIdToMapIndex.get(mapId)val mapStatusOpt = 
mapIndex.map(mapStatuses(_)).flatMap(Option(_))
  # Task t2's mapstatus's location then would be updated to executor e3 but 
it's indeed still located on executor e2. This finally leads to the fetch 
failure in the end.

  was:There is only one valid mapstatus for the same {{mapIndex}} at the same 
time in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid 
chaos.


> Cleanup mapIdToMapIndex on mapoutput unregister
> ---
>
> Key: SPARK-48394
> URL: https://issues.apache.org/jira/browse/SPARK-48394
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.5.0, 4.0.0, 3.5.1
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> There is only one valid mapstatus for the same {{mapIndex}} at the same time 
> in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid 
> chaos.
>  
> The issue leads to shuffle fetch failure and the job failure in the end. It 
> happens this way:
>  # Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on 
> executor e1
>  # Executor Y starts deommission
>  # Executor Y reports false-positve lost to driver during its decommission
>  # Stage B reuse the shuffle dependency with Stage A, and computes the 
> partition P0 again by task t2 on executor e2
>  # When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the 
> same paritition in {{mapIdToMapIndex}} but only one item 
> (mapStatuses(P0)=MapStatus(t2, e2)) in {{{}mapStatuses{}}}.
>  # Executor Y starts to migrate task t1's mapstatus (to executor e3 for 
> example) and call {{updateMapOutput}} on driver. Regarding to 5), we'd use 
> mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's 
> mapstatus.
> // updateMapOutputval mapIndex = mapIdToMapIndex.get(mapId)val mapStatusOpt = 
> mapIndex.map(mapStatuses(_)).flatMap(Option(_))
>   # Task t2's mapstatus's location then would be updated to executor e3 but 
> it's indeed still located on executor e2. This finally leads to the fetch 
> failure in the end.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48394) Cleanup mapIdToMapIndex on mapoutput unregister

2024-05-28 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-48394:
-
Issue Type: Bug  (was: Improvement)

> Cleanup mapIdToMapIndex on mapoutput unregister
> ---
>
> Key: SPARK-48394
> URL: https://issues.apache.org/jira/browse/SPARK-48394
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.5.0, 4.0.0, 3.5.1
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> There is only one valid mapstatus for the same {{mapIndex}} at the same time 
> in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid 
> chaos.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48394) Cleanup mapIdToMapIndex on mapoutput unregister

2024-05-22 Thread wuyi (Jira)
wuyi created SPARK-48394:


 Summary: Cleanup mapIdToMapIndex on mapoutput unregister
 Key: SPARK-48394
 URL: https://issues.apache.org/jira/browse/SPARK-48394
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.5.1, 3.5.0, 4.0.0
Reporter: wuyi


There is only one valid mapstatus for the same {{mapIndex}} at the same time in 
Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid chaos.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46052) Remove unnecessary TaskScheduler.killAllTaskAttempts

2023-11-22 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-46052:
-
Description: 
Spark has two functions to kill all tasks in a Stage:
* `cancelTasks`: Not only kill all the running tasks in all the stage attempts 
but also abort all the stage attempts
*  `killAllTaskAttempts`: Only kill all the running tasks in all the stage 
attemtps but won't abort the attempts.


However, there's no use case in Spark that a stage would launch new tasks after 
its all tasks get killed. So I think we can replace `killAllTaskAttempts` with 
`cancelTasks` directly.

  was:killAllTaskAttempts can be replaced with cancelTasks


> Remove unnecessary TaskScheduler.killAllTaskAttempts
> 
>
> Key: SPARK-46052
> URL: https://issues.apache.org/jira/browse/SPARK-46052
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.3, 3.2.4, 3.3.3, 3.4.1, 3.5.0
>Reporter: wuyi
>Priority: Major
>  Labels: pull-request-available
>
> Spark has two functions to kill all tasks in a Stage:
> * `cancelTasks`: Not only kill all the running tasks in all the stage 
> attempts but also abort all the stage attempts
> *  `killAllTaskAttempts`: Only kill all the running tasks in all the stage 
> attemtps but won't abort the attempts.
> However, there's no use case in Spark that a stage would launch new tasks 
> after its all tasks get killed. So I think we can replace 
> `killAllTaskAttempts` with `cancelTasks` directly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46052) Remove unnecessary TaskScheduler.killAllTaskAttempts

2023-11-22 Thread wuyi (Jira)
wuyi created SPARK-46052:


 Summary: Remove unnecessary TaskScheduler.killAllTaskAttempts
 Key: SPARK-46052
 URL: https://issues.apache.org/jira/browse/SPARK-46052
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.5.0, 3.4.1, 3.3.3, 3.2.4, 3.1.3, 3.0.3
Reporter: wuyi


killAllTaskAttempts can be replaced with cancelTasks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-45527) Task fraction resource request is not expected

2023-10-12 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774756#comment-17774756
 ] 

wuyi commented on SPARK-45527:
--

cc [~wbo4958]   [~tgraves] 

> Task fraction resource request is not expected
> --
>
> Key: SPARK-45527
> URL: https://issues.apache.org/jira/browse/SPARK-45527
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.1, 3.3.3, 3.4.1, 3.5.0
>Reporter: wuyi
>Priority: Major
>
>  
> {code:java}
> test("SPARK-XXX") {
>   import org.apache.spark.resource.{ResourceProfileBuilder, 
> TaskResourceRequests}
>   withTempDir { dir =>
> val scriptPath = createTempScriptWithExpectedOutput(dir, 
> "gpuDiscoveryScript",
>   """{"name": "gpu","addresses":["0"]}""")
> val conf = new SparkConf()
>   .setAppName("test")
>   .setMaster("local-cluster[1, 12, 1024]")
>   .set("spark.executor.cores", "12")
> conf.set(TASK_GPU_ID.amountConf, "0.08")
> conf.set(WORKER_GPU_ID.amountConf, "1")
> conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath)
> conf.set(EXECUTOR_GPU_ID.amountConf, "1")
> sc = new SparkContext(conf)
> val rdd = sc.range(0, 100, 1, 4)
> var rdd1 = rdd.repartition(3)
> val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0)
> val rp = new ResourceProfileBuilder().require(treqs).build
> rdd1 = rdd1.withResources(rp)
> assert(rdd1.collect().size === 100)
>   }
> } {code}
> In the above test, the 3 tasks generated by rdd1 are expected to be executed 
> in sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", 
> 1.0)" should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, 
> those 3 tasks are run in parallel in fact.
> The root cause is that ExecutorData#ExecutorResourceInfo#numParts is static. 
> In this case, the "gpu.numParts" is initialized with 12 (1/0.08) and won't 
> change even if there's a new task resource request (e.g., resource("gpu", 
> 1.0) in this case). Thus, those 3 tasks are able to be executed in parallel.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45527) Task fraction resource request is not expected

2023-10-12 Thread wuyi (Jira)
wuyi created SPARK-45527:


 Summary: Task fraction resource request is not expected
 Key: SPARK-45527
 URL: https://issues.apache.org/jira/browse/SPARK-45527
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.5.0, 3.4.1, 3.3.3, 3.2.1
Reporter: wuyi


 
{code:java}
test("SPARK-XXX") {
  import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}

  withTempDir { dir =>
val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
  """{"name": "gpu","addresses":["0"]}""")

val conf = new SparkConf()
  .setAppName("test")
  .setMaster("local-cluster[1, 12, 1024]")
  .set("spark.executor.cores", "12")
conf.set(TASK_GPU_ID.amountConf, "0.08")
conf.set(WORKER_GPU_ID.amountConf, "1")
conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath)
conf.set(EXECUTOR_GPU_ID.amountConf, "1")
sc = new SparkContext(conf)
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
assert(rdd1.collect().size === 100)
  }
} {code}
In the above test, the 3 tasks generated by rdd1 are expected to be executed in 
sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", 1.0)" 
should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, those 3 
tasks are run in parallel in fact.

 

 

The root cause is that ExecutorData#ExecutorResourceInfo#numParts is static. In 
this case, the "gpu.numParts" is initialized with 12 (1/0.08) and won't change 
even if there's a new task resource request (e.g., resource("gpu", 1.0) in this 
case). Thus, those 3 tasks are able to be executed in parallel.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45527) Task fraction resource request is not expected

2023-10-12 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-45527:
-
Description: 
 
{code:java}
test("SPARK-XXX") {
  import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}

  withTempDir { dir =>
val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
  """{"name": "gpu","addresses":["0"]}""")

val conf = new SparkConf()
  .setAppName("test")
  .setMaster("local-cluster[1, 12, 1024]")
  .set("spark.executor.cores", "12")
conf.set(TASK_GPU_ID.amountConf, "0.08")
conf.set(WORKER_GPU_ID.amountConf, "1")
conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath)
conf.set(EXECUTOR_GPU_ID.amountConf, "1")
sc = new SparkContext(conf)
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
assert(rdd1.collect().size === 100)
  }
} {code}
In the above test, the 3 tasks generated by rdd1 are expected to be executed in 
sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", 1.0)" 
should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, those 3 
tasks are run in parallel in fact.

The root cause is that ExecutorData#ExecutorResourceInfo#numParts is static. In 
this case, the "gpu.numParts" is initialized with 12 (1/0.08) and won't change 
even if there's a new task resource request (e.g., resource("gpu", 1.0) in this 
case). Thus, those 3 tasks are able to be executed in parallel.
 

  was:
 
{code:java}
test("SPARK-XXX") {
  import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}

  withTempDir { dir =>
val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
  """{"name": "gpu","addresses":["0"]}""")

val conf = new SparkConf()
  .setAppName("test")
  .setMaster("local-cluster[1, 12, 1024]")
  .set("spark.executor.cores", "12")
conf.set(TASK_GPU_ID.amountConf, "0.08")
conf.set(WORKER_GPU_ID.amountConf, "1")
conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath)
conf.set(EXECUTOR_GPU_ID.amountConf, "1")
sc = new SparkContext(conf)
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
assert(rdd1.collect().size === 100)
  }
} {code}
In the above test, the 3 tasks generated by rdd1 are expected to be executed in 
sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", 1.0)" 
should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, those 3 
tasks are run in parallel in fact.

 

 

The root cause is that ExecutorData#ExecutorResourceInfo#numParts is static. In 
this case, the "gpu.numParts" is initialized with 12 (1/0.08) and won't change 
even if there's a new task resource request (e.g., resource("gpu", 1.0) in this 
case). Thus, those 3 tasks are able to be executed in parallel.
 


> Task fraction resource request is not expected
> --
>
> Key: SPARK-45527
> URL: https://issues.apache.org/jira/browse/SPARK-45527
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.1, 3.3.3, 3.4.1, 3.5.0
>Reporter: wuyi
>Priority: Major
>
>  
> {code:java}
> test("SPARK-XXX") {
>   import org.apache.spark.resource.{ResourceProfileBuilder, 
> TaskResourceRequests}
>   withTempDir { dir =>
> val scriptPath = createTempScriptWithExpectedOutput(dir, 
> "gpuDiscoveryScript",
>   """{"name": "gpu","addresses":["0"]}""")
> val conf = new SparkConf()
>   .setAppName("test")
>   .setMaster("local-cluster[1, 12, 1024]")
>   .set("spark.executor.cores", "12")
> conf.set(TASK_GPU_ID.amountConf, "0.08")
> conf.set(WORKER_GPU_ID.amountConf, "1")
> conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath)
> conf.set(EXECUTOR_GPU_ID.amountConf, "1")
> sc = new SparkContext(conf)
> val rdd = sc.range(0, 100, 1, 4)
> var rdd1 = rdd.repartition(3)
> val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0)
> val rp = new ResourceProfileBuilder().require(treqs).build
> rdd1 = rdd1.withResources(rp)
> assert(rdd1.collect().size === 100)
>   }
> } {code}
> In the above test, the 3 tasks generated by rdd1 are expected to be executed 
> in sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", 
> 1.0)" should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, 
> those 3 tasks are run in parallel in fact.
> The root cause is that 

[jira] [Commented] (SPARK-45057) Deadlock caused by rdd replication level of 2

2023-09-27 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769443#comment-17769443
 ] 

wuyi commented on SPARK-45057:
--

In the case of "Received UploadBlock request from T1 (blocked by T4)", 
shouldn't it be blocked by T3?

> Deadlock caused by rdd replication level of 2
> -
>
> Key: SPARK-45057
> URL: https://issues.apache.org/jira/browse/SPARK-45057
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Zhongwei Zhu
>Priority: Major
>  Labels: pull-request-available
>
>  
> When 2 tasks try to compute same rdd with replication level of 2 and running 
> on only 2 executors. Deadlock will happen.
> Task only release lock after writing into local machine and replicate to 
> remote executor.
>  
> ||Time||Exe 1 (Task Thread T1)||Exe 1 (Shuffle Server Thread T2)||Exe 2 (Task 
> Thread T3)||Exe 2 (Shuffle Server Thread T4)||
> |T0|write lock of rdd| | | |
> |T1| | |write lock of rdd| |
> |T2|replicate -> UploadBlockSync (blocked by T4)| | | |
> |T3| | | |Received UploadBlock request from T1 (blocked by T4)|
> |T4| | |replicate -> UploadBlockSync (blocked by T2)| |
> |T5| |Received UploadBlock request from T3 (blocked by T1)| | |
> |T6|Deadlock|Deadlock|Deadlock|Deadlock|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45310) Mapstatus location type changed from external shuffle service to executor after decommission migration

2023-09-25 Thread wuyi (Jira)
wuyi created SPARK-45310:


 Summary: Mapstatus location type changed from external shuffle 
service to executor after decommission migration
 Key: SPARK-45310
 URL: https://issues.apache.org/jira/browse/SPARK-45310
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.5.0, 3.4.1, 3.3.2, 3.2.4, 3.1.3, 3.0.3
Reporter: wuyi


When migrating shuffle blocks during decommission, the updated mapstatus 
location doesn't respect the external shuffle service location when external 
shuffle service is enabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42577) A large stage could run indefinitely due to executor lost

2023-02-25 Thread wuyi (Jira)
wuyi created SPARK-42577:


 Summary: A large stage could run indefinitely due to executor lost
 Key: SPARK-42577
 URL: https://issues.apache.org/jira/browse/SPARK-42577
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.3.2, 3.2.3, 3.1.3, 3.0.3
Reporter: wuyi


When a stage is extremely large and Spark runs on spot instances or problematic 
clusters with frequent worker/executor loss,  the stage could run indefinitely 
due to task rerun caused by the executor loss. This happens, when the external 
shuffle service is on, and the large stages runs hours to complete, when spark 
tries to submit a child stage, it will find the parent stage - the large one, 
has missed some partitions, so the large stage has to rerun. When it completes 
again, it finds new missing partitions due to the same reason.

We should add a attempt limitation for this kind of scenario.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41958) Disallow arbitrary custom classpath with proxy user in cluster mode

2023-01-09 Thread wuyi (Jira)
wuyi created SPARK-41958:


 Summary: Disallow arbitrary custom classpath with proxy user in 
cluster mode
 Key: SPARK-41958
 URL: https://issues.apache.org/jira/browse/SPARK-41958
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.2.3, 3.3.1, 3.1.3, 3.0.3, 2.4.8
Reporter: wuyi


To avoid arbitrary classpath in spark cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2023-01-02 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653781#comment-17653781
 ] 

wuyi commented on SPARK-41497:
--

> If I am not wrong, SQL makes very heavy use of accumulators, and so most 
> stages will end up having them anyway - right ?

Right.

 

> I would expect this scenario (even without accumulator) to be fairly low 
> frequency enough that the cost of extra recomputation might be fine.
 
Agree. So shall we proceed with the improved Option 4 that was proposed by you 
[~mridulm80] ? [~ivoson] can help with the fix.
 

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Updated] (SPARK-41848) Tasks are over-scheduled with TaskResourceProfile

2023-01-02 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-41848:
-
Priority: Blocker  (was: Major)

> Tasks are over-scheduled with TaskResourceProfile
> -
>
> Key: SPARK-41848
> URL: https://issues.apache.org/jira/browse/SPARK-41848
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: wuyi
>Priority: Blocker
>
> {code:java}
> test("SPARK-XXX") {
>   val conf = new 
> SparkConf().setAppName("test").setMaster("local-cluster[1,4,1024]")
>   sc = new SparkContext(conf)
>   val req = new TaskResourceRequests().cpus(3)
>   val rp = new ResourceProfileBuilder().require(req).build()
>   val res = sc.parallelize(Seq(0, 1), 2).withResources(rp).map { x =>
> Thread.sleep(5000)
> x * 2
>   }.collect()
>   assert(res === Array(0, 2))
> } {code}
> In this test, tasks are supposed to be scheduled in order since each task 
> requires 3 cores but the executor only has 4 cores. However, we noticed 2 
> tasks are launched concurrently from the logs.
> It turns out that we used the TaskResourceProfile (taskCpus=3) of the taskset 
> for task scheduling:
> {code:java}
> val rpId = taskSet.taskSet.resourceProfileId
> val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
> val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, 
> conf) {code}
> but the ResourceProfile (taskCpus=1) of the executor for updating the free 
> cores in ExecutorData:
> {code:java}
> val rpId = executorData.resourceProfileId
> val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
> val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
> executorData.freeCores -= taskCpus {code}
> which results in the inconsistency of the available cores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41848) Tasks are over-scheduled with TaskResourceProfile

2023-01-02 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653739#comment-17653739
 ] 

wuyi commented on SPARK-41848:
--

cc [~ivoson] 

> Tasks are over-scheduled with TaskResourceProfile
> -
>
> Key: SPARK-41848
> URL: https://issues.apache.org/jira/browse/SPARK-41848
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: wuyi
>Priority: Major
>
> {code:java}
> test("SPARK-XXX") {
>   val conf = new 
> SparkConf().setAppName("test").setMaster("local-cluster[1,4,1024]")
>   sc = new SparkContext(conf)
>   val req = new TaskResourceRequests().cpus(3)
>   val rp = new ResourceProfileBuilder().require(req).build()
>   val res = sc.parallelize(Seq(0, 1), 2).withResources(rp).map { x =>
> Thread.sleep(5000)
> x * 2
>   }.collect()
>   assert(res === Array(0, 2))
> } {code}
> In this test, tasks are supposed to be scheduled in order since each task 
> requires 3 cores but the executor only has 4 cores. However, we noticed 2 
> tasks are launched concurrently from the logs.
> It turns out that we used the TaskResourceProfile (taskCpus=3) of the taskset 
> for task scheduling:
> {code:java}
> val rpId = taskSet.taskSet.resourceProfileId
> val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
> val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, 
> conf) {code}
> but the ResourceProfile (taskCpus=1) of the executor for updating the free 
> cores in ExecutorData:
> {code:java}
> val rpId = executorData.resourceProfileId
> val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
> val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
> executorData.freeCores -= taskCpus {code}
> which results in the inconsistency of the available cores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41848) Tasks are over-scheduled with TaskResourceProfile

2023-01-02 Thread wuyi (Jira)
wuyi created SPARK-41848:


 Summary: Tasks are over-scheduled with TaskResourceProfile
 Key: SPARK-41848
 URL: https://issues.apache.org/jira/browse/SPARK-41848
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.4.0
Reporter: wuyi


{code:java}
test("SPARK-XXX") {
  val conf = new 
SparkConf().setAppName("test").setMaster("local-cluster[1,4,1024]")
  sc = new SparkContext(conf)
  val req = new TaskResourceRequests().cpus(3)
  val rp = new ResourceProfileBuilder().require(req).build()

  val res = sc.parallelize(Seq(0, 1), 2).withResources(rp).map { x =>
Thread.sleep(5000)
x * 2
  }.collect()
  assert(res === Array(0, 2))
} {code}
In this test, tasks are supposed to be scheduled in order since each task 
requires 3 cores but the executor only has 4 cores. However, we noticed 2 tasks 
are launched concurrently from the logs.

It turns out that we used the TaskResourceProfile (taskCpus=3) of the taskset 
for task scheduling:
{code:java}
val rpId = taskSet.taskSet.resourceProfileId
val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, 
conf) {code}
but the ResourceProfile (taskCpus=1) of the executor for updating the free 
cores in ExecutorData:
{code:java}
val rpId = executorData.resourceProfileId
val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
executorData.freeCores -= taskCpus {code}
which results in the inconsistency of the available cores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39853) Support stage level schedule for standalone cluster when dynamic allocation is disabled

2023-01-02 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-39853:
-
Fix Version/s: 3.4.0

> Support stage level schedule for standalone cluster when dynamic allocation 
> is disabled
> ---
>
> Key: SPARK-39853
> URL: https://issues.apache.org/jira/browse/SPARK-39853
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: huangtengfei
>Assignee: huangtengfei
>Priority: Major
> Fix For: 3.4.0
>
>
> [SPARK-39062|https://issues.apache.org/jira/browse/SPARK-39062] added stage 
> level schedule support for standalone cluster when dynamic allocation was 
> enabled, spark would request for executors for different resource profiles.
> While when dynamic allocation is disabled, we can also leverage stage level 
> schedule to schedule tasks based on resource profile(task resource requests) 
> to executors with default resource profile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646971#comment-17646971
 ] 

wuyi edited comment on SPARK-41497 at 12/14/22 7:31 AM:


I'm thinking if we could improve the improved Option 4 by changing the rdd 
cache reuse condition a bit:

 

if there are no accumulators (external only probably) values changed after the 
rdd computation, then the rdd's cache should be marked as usable/visible no 
matter whether the task succeeds or fail;

 

If there are accumulators values changed after the rdd computation, then the 
rdd's cache should only be marked as usable/visible only when the task succeeds.

 

(let me think further and see if it's doable..)


was (Author: ngone51):
I'm thinking if we could improve the improved Option 4 by changing the rdd 
cache reuse condition a bit:

 

if there're no accumulators (external only probably) values changed after the 
rdd computation, then the rdd's cache should be marked as usable/visible no 
matter whether the task succeeds or fail.

 

(let me think further and see if it's doable..)

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: 

[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646971#comment-17646971
 ] 

wuyi commented on SPARK-41497:
--

I'm thinking if we could improve the improved Option 4 by changing the rdd 
cache reuse condition a bit:

 

if there're no accumulators (external only probably) values changed after the 
rdd computation, then the rdd's cache should be marked as usable/visible no 
matter whether the task succeeds or fail.

 

(let me think further and see if it's doable..)

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646969#comment-17646969
 ] 

wuyi commented on SPARK-41497:
--

> do we have a way to do that ?

 

[~mridulm80]  Currently, we only have the mapping between the task and 
accumulators. Accumulators are registered to the task via TaskContext.get() 
when they deserialize at the executor.

If we could have a way to know which RDD scope the accumulator within when 
deserializing, we could set up the mapping between the RDD and accumulators 
then. This probably

be the most difficult part.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646919#comment-17646919
 ] 

wuyi commented on SPARK-41497:
--

[~mridulm80]  For b) and c), shouldn't we allow T2 to use the result of T1's 
cache if rdd1's computation doesn't include any accumulators?

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646639#comment-17646639
 ] 

wuyi commented on SPARK-41497:
--

[~mridulm80] Sounds like a better idea than option 4. But I think it still 
doesn't work well for the case like:

For example, a task is constructed by `rdd1.cache().rdd2`. So if the task fails 
due to rdd2's computation, I think rdd1's cache should still be able to reuse.

 

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-41497:
-
Description: 
Accumulator could be undercounted when the retried task has rdd cache.  See the 
example below and you could also find the completed and reproducible example at 
[https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]

  
{code:scala}
test("SPARK-XXX") {
  // Set up a cluster with 2 executors
  val conf = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite")
  sc = new SparkContext(conf)
  // Set up a custom task scheduler. The scheduler will fail the first task 
attempt of the job
  // submitted below. In particular, the failed first attempt task would 
success on computation
  // (accumulator accounting, result caching) but only fail to report its 
success status due
  // to the concurrent executor lost. The second task attempt would success.
  taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
  val myAcc = sc.longAccumulator("myAcc")
  // Initiate a rdd with only one partition so there's only one task and 
specify the storage level
  // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
executors.
  val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
myAcc.add(100)
iter.map(x => x + 1)
  }.persist(StorageLevel.MEMORY_ONLY_2)
  // This will pass since the second task attempt will succeed
  assert(rdd.count() === 10)
  // This will fail due to `myAcc.add(100)` won't be executed during the second 
task attempt's
  // execution. Because the second task attempt will load the rdd cache 
directly instead of
  // executing the task function so `myAcc.add(100)` is skipped.
  assert(myAcc.value === 100)
} {code}
 

We could also hit this issue with decommission even if the rdd only has one 
copy. For example, decommission could migrate the rdd cache block to another 
executor (the result is actually the same with 2 copies) and the decommissioned 
executor lost before the task reports its success status to the driver. 

 

And the issue is a bit more complicated than expected to fix. I have tried to 
give some fixes but all of them are not ideal:

Option 1: Clean up any rdd cache related to the failed task: in practice, this 
option can already fix the issue in most cases. However, theoretically, rdd 
cache could be reported to the driver right after the driver cleans up the 
failed task's caches due to asynchronous communication. So this option can’t 
resolve the issue thoroughly;

Option 2: Disallow rdd cache reuse across the task attempts for the same task: 
this option can 100% fix the issue. The problem is this way can also affect the 
case where rdd cache can be reused across the attempts (e.g., when there is no 
accumulator operation in the task), which can have perf regression;

Option 3: Introduce accumulator cache: first, this requires a new framework for 
supporting accumulator cache; second, the driver should improve its logic to 
distinguish whether the accumulator cache value should be reported to the user 
to avoid overcounting. For example, in the case of task retry, the value should 
be reported. However, in the case of rdd cache reuse, the value shouldn’t be 
reported (should it?);

Option 4: Do task success validation when a task trying to load the rdd cache: 
this way defines a rdd cache is only valid/accessible if the task has 
succeeded. This way could be either overkill or a bit complex (because 
currently Spark would clean up the task state once it’s finished. So we need to 
maintain a structure to know if task once succeeded or not. )

  was:
Accumulator could be undercounted when the retried task has rdd cache.  See the 
example below and you could also find the completed and reproducible example at 
[https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]

  
{code:scala}
test("SPARK-XXX") {
  // Set up a cluster with 2 executors
  val conf = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite")
  sc = new SparkContext(conf)
  // Set up a custom task scheduler. The scheduler will fail the first task 
attempt of the job
  // submitted below. In particular, the failed first attempt task would 
success on computation
  // (accumulator accounting, result caching) but only fail to report its 
success status due
  // to the concurrent executor lost. The second task attempt would success.
  taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
  val myAcc = sc.longAccumulator("myAcc")
  // Initiate a rdd with only one partition so there's only one task and 
specify the storage level
  // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
executors.
  val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
myAcc.add(100)
iter.map(x => x + 1)
  }.persist(StorageLevel.MEMORY_ONLY_2)
  // This will 

[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-12 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646097#comment-17646097
 ] 

wuyi commented on SPARK-41497:
--

[~mridulm80] [~tgraves] [~attilapiros] [~ivoson] any good ideas?

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task was successful or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-12 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-41497:
-
Description: 
Accumulator could be undercounted when the retried task has rdd cache.  See the 
example below and you could also find the completed and reproducible example at 
[https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]

  
{code:scala}
test("SPARK-XXX") {
  // Set up a cluster with 2 executors
  val conf = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite")
  sc = new SparkContext(conf)
  // Set up a custom task scheduler. The scheduler will fail the first task 
attempt of the job
  // submitted below. In particular, the failed first attempt task would 
success on computation
  // (accumulator accounting, result caching) but only fail to report its 
success status due
  // to the concurrent executor lost. The second task attempt would success.
  taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
  val myAcc = sc.longAccumulator("myAcc")
  // Initiate a rdd with only one partition so there's only one task and 
specify the storage level
  // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
executors.
  val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
myAcc.add(100)
iter.map(x => x + 1)
  }.persist(StorageLevel.MEMORY_ONLY_2)
  // This will pass since the second task attempt will succeed
  assert(rdd.count() === 10)
  // This will fail due to `myAcc.add(100)` won't be executed during the second 
task attempt's
  // execution. Because the second task attempt will load the rdd cache 
directly instead of
  // executing the task function so `myAcc.add(100)` is skipped.
  assert(myAcc.value === 100)
} {code}
 

We could also hit this issue with decommission even if the rdd only has one 
copy. For example, decommission could migrate the rdd cache block to another 
executor (the result is actually the same with 2 copies) and the decommissioned 
executor lost before the task reports its success status to the driver. 

 

And the issue is a bit more complicated than expected to fix. I have tried to 
give some fixes but all of them are not ideal:

Option 1: Clean up any rdd cache related to the failed task: in practice, this 
option can already fix the issue in most cases. However, theoretically, rdd 
cache could be reported to the driver right after the driver cleans up the 
failed task's caches due to asynchronous communication. So this option can’t 
resolve the issue thoroughly;

Option 2: Disallow rdd cache reuse across the task attempts for the same task: 
this option can 100% fix the issue. The problem is this way can also affect the 
case where rdd cache can be reused across the attempts (e.g., when there is no 
accumulator operation in the task), which can have perf regression;

Option 3: Introduce accumulator cache: first, this requires a new framework for 
supporting accumulator cache; second, the driver should improve its logic to 
distinguish whether the accumulator cache value should be reported to the user 
to avoid overcounting. For example, in the case of task retry, the value should 
be reported. However, in the case of rdd cache reuse, the value shouldn’t be 
reported (should it?);

Option 4: Do task success validation when a task trying to load the rdd cache: 
this way defines a rdd cache is only valid/accessible if the task has 
succeeded. This way could be either overkill or a bit complex (because 
currently Spark would clean up the task state once it’s finished. So we need to 
maintain a structure to know if task was successful or not. )

  was:Accumulator could be undercounted when the retried task has rdd cached.


> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to 

[jira] [Created] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-12 Thread wuyi (Jira)
wuyi created SPARK-41497:


 Summary: Accumulator undercounting in the case of retry task with 
rdd cache
 Key: SPARK-41497
 URL: https://issues.apache.org/jira/browse/SPARK-41497
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.3.1, 3.2.2, 3.1.3, 3.0.3, 2.4.8
Reporter: wuyi


Accumulator could be undercounted when the retried task has rdd cached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41469) Task rerun on decommissioned executor can be avoided if shuffle data has migrated

2022-12-09 Thread wuyi (Jira)
wuyi created SPARK-41469:


 Summary: Task rerun on decommissioned executor can be avoided if 
shuffle data has migrated
 Key: SPARK-41469
 URL: https://issues.apache.org/jira/browse/SPARK-41469
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.3.1, 3.2.2, 3.1.3
Reporter: wuyi


Currently, we will always rerun a finished shuffle map task if it once runs the 
lost executor. However, in the case of the executor loss is caused by 
decommission, the shuffle data might be migrated so that task doesn't need to 
rerun.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41460) Introduce IsolatedThreadSafeRpcEndpoint to extend IsolatedRpcEndpoint

2022-12-08 Thread wuyi (Jira)
wuyi created SPARK-41460:


 Summary: Introduce IsolatedThreadSafeRpcEndpoint to extend 
IsolatedRpcEndpoint 
 Key: SPARK-41460
 URL: https://issues.apache.org/jira/browse/SPARK-41460
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.3.1, 3.2.2, 3.1.3, 3.0.3, 2.4.8
Reporter: wuyi


The current RpcEndpoint extension to IsolatedRpcEndpoint always implies that 
it's thread-safe. However,  it's not explicit to developers. So people may 
mistakenly override the IsolatedRpcEndpoint#threadCount with more than 1 
threadCount leading to the thread-safety broken.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41360) Avoid BlockManager re-registration if the executor has been lost

2022-12-01 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-41360:
-
Summary: Avoid BlockManager re-registration if the executor has been lost  
(was: Avoid BlockMananger re-registration if the executor has been lost)

> Avoid BlockManager re-registration if the executor has been lost
> 
>
> Key: SPARK-41360
> URL: https://issues.apache.org/jira/browse/SPARK-41360
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> We should avoid block manager re-registration if the executor has been lost 
> as it's meaningless and harmful, e.g., SPARK-35011



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41360) Avoid BlockMananger re-registration if the executor has been lost

2022-12-01 Thread wuyi (Jira)
wuyi created SPARK-41360:


 Summary: Avoid BlockMananger re-registration if the executor has 
been lost
 Key: SPARK-41360
 URL: https://issues.apache.org/jira/browse/SPARK-41360
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.3.1, 3.2.2, 3.1.3, 3.0.3, 2.4.8
Reporter: wuyi


We should avoid block manager re-registration if the executor has been lost as 
it's meaningless and harmful, e.g., SPARK-35011



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35011) False active executor in UI that caused by BlockManager reregistration

2022-12-01 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-35011:
-
Summary: False active executor in UI that caused by BlockManager 
reregistration  (was: Avoid Block Manager registerations when StopExecutor msg 
is in-flight.)

> False active executor in UI that caused by BlockManager reregistration
> --
>
> Key: SPARK-35011
> URL: https://issues.apache.org/jira/browse/SPARK-35011
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Sumeet
>Assignee: wuyi
>Priority: Major
>  Labels: BlockManager, core
> Fix For: 3.3.0
>
>
> *Note:* This is a follow-up on SPARK-34949, even after the heartbeat fix, 
> driver reports dead executors as alive.
> *Problem:*
> I was testing Dynamic Allocation on K8s with about 300 executors. While doing 
> so, when the executors were torn down due to 
> "spark.dynamicAllocation.executorIdleTimeout", I noticed all the executor 
> pods being removed from K8s, however, under the "Executors" tab in SparkUI, I 
> could see some executors listed as alive. 
> [spark.sparkContext.statusTracker.getExecutorInfos.length|https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala#L100]
>  also returned a value greater than 1. 
>  
> *Cause:*
>  * "CoarseGrainedSchedulerBackend" issues async "StopExecutor" on 
> executorEndpoint
>  * "CoarseGrainedSchedulerBackend" removes that executor from Driver's 
> internal data structures and publishes "SparkListenerExecutorRemoved" on the 
> "listenerBus".
>  * Executor has still not processed "StopExecutor" from the Driver
>  * Driver receives heartbeat from the Executor, since it cannot find the 
> "executorId" in its data structures, it responds with 
> "HeartbeatResponse(reregisterBlockManager = true)"
>  * "BlockManager" on the Executor reregisters with the "BlockManagerMaster" 
> and "SparkListenerBlockManagerAdded" is published on the "listenerBus"
>  * Executor starts processing the "StopExecutor" and exits
>  * "AppStatusListener" picks the "SparkListenerBlockManagerAdded" event and 
> updates "AppStatusStore"
>  * "statusTracker.getExecutorInfos" refers "AppStatusStore" to get the list 
> of executors which returns the dead executor as alive.
>  
> *Proposed Solution:*
> Maintain a Cache of recently removed executors on Driver. During the 
> registration in BlockManagerMasterEndpoint if the BlockManager belongs to a 
> recently removed executor, return None indicating the registration is ignored 
> since the executor will be shutting down soon.
> On BlockManagerHeartbeat, if the BlockManager belongs to a recently removed 
> executor, return true indicating the driver knows about it, thereby 
> preventing reregisteration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40596) Populate ExecutorDecommission with more informative messages

2022-10-10 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-40596.
--
  Assignee: Bo Zhang
Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/38030

> Populate ExecutorDecommission with more informative messages
> 
>
> Key: SPARK-40596
> URL: https://issues.apache.org/jira/browse/SPARK-40596
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Bo Zhang
>Assignee: Bo Zhang
>Priority: Major
>
> Currently the message in {{ExecutorDecommission}} is a fixed value 
> {{{}"Executor decommission."{}}}, and it is the same for all cases, including 
> spot instance interruptions and auto-scaling down. We should put a detailed 
> message in {{ExecutorDecommission}} to better differentiate those cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-39853) Support stage level schedule for standalone cluster when dynamic allocation is disabled

2022-09-29 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-39853.
--
Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/37268

> Support stage level schedule for standalone cluster when dynamic allocation 
> is disabled
> ---
>
> Key: SPARK-39853
> URL: https://issues.apache.org/jira/browse/SPARK-39853
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: huangtengfei
>Assignee: huangtengfei
>Priority: Major
>
> [SPARK-39062|https://issues.apache.org/jira/browse/SPARK-39062] added stage 
> level schedule support for standalone cluster when dynamic allocation was 
> enabled, spark would request for executors for different resource profiles.
> While when dynamic allocation is disabled, we can also leverage stage level 
> schedule to schedule tasks based on resource profile(task resource requests) 
> to executors with default resource profile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-39853) Support stage level schedule for standalone cluster when dynamic allocation is disabled

2022-09-29 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi reassigned SPARK-39853:


Assignee: huangtengfei

> Support stage level schedule for standalone cluster when dynamic allocation 
> is disabled
> ---
>
> Key: SPARK-39853
> URL: https://issues.apache.org/jira/browse/SPARK-39853
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: huangtengfei
>Assignee: huangtengfei
>Priority: Major
>
> [SPARK-39062|https://issues.apache.org/jira/browse/SPARK-39062] added stage 
> level schedule support for standalone cluster when dynamic allocation was 
> enabled, spark would request for executors for different resource profiles.
> While when dynamic allocation is disabled, we can also leverage stage level 
> schedule to schedule tasks based on resource profile(task resource requests) 
> to executors with default resource profile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40320) When the Executor plugin fails to initialize, the Executor shows active but does not accept tasks forever, just like being hung

2022-09-22 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17608268#comment-17608268
 ] 

wuyi commented on SPARK-40320:
--

I see. Thanks for the explaination. 

> When the Executor plugin fails to initialize, the Executor shows active but 
> does not accept tasks forever, just like being hung
> ---
>
> Key: SPARK-40320
> URL: https://issues.apache.org/jira/browse/SPARK-40320
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 3.0.0
>Reporter: Mars
>Priority: Major
>
> *Reproduce step:*
> set `spark.plugins=ErrorSparkPlugin`
> `ErrorSparkPlugin` && `ErrorExecutorPlugin` class as below (I abbreviate the 
> code to make it clearer):
> {code:java}
> class ErrorSparkPlugin extends SparkPlugin {
>   /**
>*/
>   override def driverPlugin(): DriverPlugin =  new ErrorDriverPlugin()
>   /**
>*/
>   override def executorPlugin(): ExecutorPlugin = new ErrorExecutorPlugin()
> }{code}
> {code:java}
> class ErrorExecutorPlugin extends ExecutorPlugin {
>   private val checkingInterval: Long = 1
>   override def init(_ctx: PluginContext, extraConf: util.Map[String, 
> String]): Unit = {
> if (checkingInterval == 1) {
>   throw new UnsatisfiedLinkError("My Exception error")
> }
>   }
> } {code}
> The Executor is active when we check in spark-ui, however it was broken and 
> doesn't receive any task.
> *Root Cause:*
> I check the code and I find in `org.apache.spark.rpc.netty.Inbox#safelyCall` 
> it will throw fatal error (`UnsatisfiedLinkError` is fatal erro ) in method 
> `dealWithFatalError` . Actually the  `CoarseGrainedExecutorBackend` JVM 
> process  is active but the  communication thread is no longer working ( 
> please see  `MessageLoop#receiveLoopRunnable` , `receiveLoop()` was broken, 
> so executor doesn't receive any message)
> Some ideas:
> I think it is very hard to know what happened here unless we check in the 
> code. The Executor is active but it can't do anything. We will wonder if the 
> driver is broken or the Executor problem.  I think at least the Executor 
> status shouldn't be active here or the Executor can exitExecutor (kill itself)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40320) When the Executor plugin fails to initialize, the Executor shows active but does not accept tasks forever, just like being hung

2022-09-06 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600777#comment-17600777
 ] 

wuyi commented on SPARK-40320:
--

> Actually the  `CoarseGrainedExecutorBackend` JVM process  is active but the  
> communication thread is no longer working ( please see  
> `MessageLoop#receiveLoopRunnable` , `receiveLoop()` was broken, so executor 
> doesn't receive any message)

Hmm, shouldn't it bring up a new `receiveLoop()` to serve RPC messages 
according to 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L82-L89]
 . Why it doesn't?

 

Besides, why SparkUncaughtExceptionHandler doesn't catch the fatal error?

 

cc [~tgraves] [~mridulm80] 

> When the Executor plugin fails to initialize, the Executor shows active but 
> does not accept tasks forever, just like being hung
> ---
>
> Key: SPARK-40320
> URL: https://issues.apache.org/jira/browse/SPARK-40320
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 3.0.0
>Reporter: Mars
>Priority: Major
>
> *Reproduce step:*
> set `spark.plugins=ErrorSparkPlugin`
> `ErrorSparkPlugin` && `ErrorExecutorPlugin` class as below (I abbreviate the 
> code to make it clearer):
> {code:java}
> class ErrorSparkPlugin extends SparkPlugin {
>   /**
>*/
>   override def driverPlugin(): DriverPlugin =  new ErrorDriverPlugin()
>   /**
>*/
>   override def executorPlugin(): ExecutorPlugin = new ErrorExecutorPlugin()
> }{code}
> {code:java}
> class ErrorExecutorPlugin extends ExecutorPlugin {
>   private val checkingInterval: Long = 1
>   override def init(_ctx: PluginContext, extraConf: util.Map[String, 
> String]): Unit = {
> if (checkingInterval == 1) {
>   throw new UnsatisfiedLinkError("My Exception error")
> }
>   }
> } {code}
> The Executor is active when we check in spark-ui, however it was broken and 
> doesn't receive any task.
> *Root Cause:*
> I check the code and I find in `org.apache.spark.rpc.netty.Inbox#safelyCall` 
> it will throw fatal error (`UnsatisfiedLinkError` is fatal erro ) in method 
> `dealWithFatalError` . Actually the  `CoarseGrainedExecutorBackend` JVM 
> process  is active but the  communication thread is no longer working ( 
> please see  `MessageLoop#receiveLoopRunnable` , `receiveLoop()` was broken, 
> so executor doesn't receive any message)
> Some ideas:
> I think it is very hard to know what happened here unless we check in the 
> code. The Executor is active but it can't do anything. We will wonder if the 
> driver is broken or the Executor problem.  I think at least the Executor 
> status shouldn't be active here or the Executor can exitExecutor (kill itself)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-39957) Delay onDisconnected to enable Driver receives ExecutorExitCode

2022-08-24 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi reassigned SPARK-39957:


Assignee: Kai-Hsun Chen

> Delay onDisconnected to enable Driver receives ExecutorExitCode
> ---
>
> Key: SPARK-39957
> URL: https://issues.apache.org/jira/browse/SPARK-39957
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Kai-Hsun Chen
>Assignee: Kai-Hsun Chen
>Priority: Major
>
> There are two methods to detect executor loss. First, when RPC fails, the 
> function {{onDisconnected}} will be triggered. Second, when executor exits 
> with ExecutorExitCode, the exit code will be passed from ExecutorRunner to 
> Driver. These two methods may categorize same cases into different 
> conclusions. We hope to categorize the ExecutorLossReason by 
> ExecutorExitCode. This PR aims to make sure Driver receives ExecutorExitCode 
> before onDisconnected is called.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-39957) Delay onDisconnected to enable Driver receives ExecutorExitCode

2022-08-24 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-39957.
--
Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/37400

> Delay onDisconnected to enable Driver receives ExecutorExitCode
> ---
>
> Key: SPARK-39957
> URL: https://issues.apache.org/jira/browse/SPARK-39957
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Kai-Hsun Chen
>Assignee: Kai-Hsun Chen
>Priority: Major
>
> There are two methods to detect executor loss. First, when RPC fails, the 
> function {{onDisconnected}} will be triggered. Second, when executor exits 
> with ExecutorExitCode, the exit code will be passed from ExecutorRunner to 
> Driver. These two methods may categorize same cases into different 
> conclusions. We hope to categorize the ExecutorLossReason by 
> ExecutorExitCode. This PR aims to make sure Driver receives ExecutorExitCode 
> before onDisconnected is called.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39957) Delay onDisconnected to enable Driver receives ExecutorExitCode

2022-08-24 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-39957:
-
Fix Version/s: 3.4.0

> Delay onDisconnected to enable Driver receives ExecutorExitCode
> ---
>
> Key: SPARK-39957
> URL: https://issues.apache.org/jira/browse/SPARK-39957
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Kai-Hsun Chen
>Assignee: Kai-Hsun Chen
>Priority: Major
> Fix For: 3.4.0
>
>
> There are two methods to detect executor loss. First, when RPC fails, the 
> function {{onDisconnected}} will be triggered. Second, when executor exits 
> with ExecutorExitCode, the exit code will be passed from ExecutorRunner to 
> Driver. These two methods may categorize same cases into different 
> conclusions. We hope to categorize the ExecutorLossReason by 
> ExecutorExitCode. This PR aims to make sure Driver receives ExecutorExitCode 
> before onDisconnected is called.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34788) Spark throws FileNotFoundException instead of IOException when disk is full

2022-07-28 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572506#comment-17572506
 ] 

wuyi commented on SPARK-34788:
--

> Why don't you ensure enough disks from the beginning?

On the write side, since Spark usually uses iterators to write data into disk, 
it's not possible to know the size in advance.

On the read side (the error in this ticket is actually raised when reading a 
file), it can be overkill to me if we'd check if the disk is full each time 
before reading a file.

 

> This goal of this Jira issue seems to aim `cosmetic` change which has not 
>much benefit.

Yea, this won't fix the root cause but gives a better error msg for users.

> Spark throws FileNotFoundException instead of IOException when disk is full
> ---
>
> Key: SPARK-34788
> URL: https://issues.apache.org/jira/browse/SPARK-34788
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: wuyi
>Priority: Major
>
> When the disk is full, Spark throws FileNotFoundException instead of 
> IOException with the hint. It's quite a confusing error to users:
> {code:java}
> 9/03/26 09:03:45 ERROR ShuffleBlockFetcherIterator: Failed to create input 
> stream from local block
> java.io.IOException: Error in reading 
> FileSegmentManagedBuffer{file=/local_disk0/spark-c2f26f02-2572-4764-815a-cbba65ddb315/executor-b4b76a4c-788c-4cb6-b904-664a883be1aa/blockmgr-36804371-24fe-4131-a3dc-00b7f98f3a3e/11/shuffle_113_1029_0.data,
>  offset=110254956, length=1875458}
>   at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:111)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:442)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622)
>   at 
> org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:98)
>   at 
> org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:95)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:839)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:839)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>   at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
>   at org.apache.spark.scheduler.Task.run(Task.scala:112)
>   at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> 

[jira] [Commented] (SPARK-34788) Spark throws FileNotFoundException instead of IOException when disk is full

2022-07-28 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572478#comment-17572478
 ] 

wuyi commented on SPARK-34788:
--

[~leo wen] are you able to reproduce in your environment? If yes, we could 
verify the proposal 
([http://weblog.janek.org/Archive/2004/12/20/ExceptionWhenWritingToAFu.html] ).

> Spark throws FileNotFoundException instead of IOException when disk is full
> ---
>
> Key: SPARK-34788
> URL: https://issues.apache.org/jira/browse/SPARK-34788
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: wuyi
>Priority: Major
>
> When the disk is full, Spark throws FileNotFoundException instead of 
> IOException with the hint. It's quite a confusing error to users:
> {code:java}
> 9/03/26 09:03:45 ERROR ShuffleBlockFetcherIterator: Failed to create input 
> stream from local block
> java.io.IOException: Error in reading 
> FileSegmentManagedBuffer{file=/local_disk0/spark-c2f26f02-2572-4764-815a-cbba65ddb315/executor-b4b76a4c-788c-4cb6-b904-664a883be1aa/blockmgr-36804371-24fe-4131-a3dc-00b7f98f3a3e/11/shuffle_113_1029_0.data,
>  offset=110254956, length=1875458}
>   at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:111)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:442)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622)
>   at 
> org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:98)
>   at 
> org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:95)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:839)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:839)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>   at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
>   at org.apache.spark.scheduler.Task.run(Task.scala:112)
>   at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: 
> /local_disk0/spark-c2f26f02-2572-4764-815a-cbba65ddb315/executor-b4b76a4c-788c-4cb6-b904-664a883be1aa/blockmgr-36804371-24fe-4131-a3dc-00b7f98f3a3e/11/shuffle_113_1029_0.data
>  (No such file or directory)
>   at 

[jira] [Resolved] (SPARK-39062) Add Standalone backend support for Stage Level Scheduling

2022-07-04 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-39062.
--
Fix Version/s: 3.4.0
 Assignee: huangtengfei
   Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/36716

> Add Standalone backend support for Stage Level Scheduling
> -
>
> Key: SPARK-39062
> URL: https://issues.apache.org/jira/browse/SPARK-39062
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: Xingbo Jiang
>Assignee: huangtengfei
>Priority: Major
> Fix For: 3.4.0
>
>
> We should add Standalone backend support for Stage Level Scheduling:
> * The Master should be able to generate executors for multiple 
> ResouceProfiles, currently it only considers available CPUs;
> * The Worker need let the executor know about its ResourceProfile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32170) Improve the speculation for the inefficient tasks by the task metrics.

2022-06-28 Thread wuyi (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 wuyi updated an issue  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Spark /  SPARK-32170  
 
 
   Improve the speculation for the inefficient tasks by the task metrics.   
 

  
 
 
 
 

 
Change By: 
 wuyi  
 
 
Fix Version/s: 
 3.4.0  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Resolved] (SPARK-32170) Improve the speculation for the inefficient tasks by the task metrics.

2022-06-28 Thread wuyi (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 wuyi resolved as Fixed  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Issue resolved by https://github.com/apache/spark/pull/36162  
 

  
 
 
 
 

 
 Spark /  SPARK-32170  
 
 
   Improve the speculation for the inefficient tasks by the task metrics.   
 

  
 
 
 
 

 
Change By: 
 wuyi  
 
 
Assignee: 
 weixiuli  
 
 
Resolution: 
 Fixed  
 
 
Status: 
 In Progress Resolved  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Resolved] (SPARK-39152) StreamCorruptedException cause job failure for disk persisted RDD

2022-06-20 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-39152.
--
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/36512

> StreamCorruptedException cause job failure for disk persisted RDD
> -
>
> Key: SPARK-39152
> URL: https://issues.apache.org/jira/browse/SPARK-39152
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager
>Affects Versions: 3.4.0
>Reporter: Attila Zsolt Piros
>Assignee: Attila Zsolt Piros
>Priority: Major
> Fix For: 3.4.0
>
>
> In case of a disk corruption a disk persisted RDD block will lead to job 
> failure as the block registration is always leads to the same file. So even 
> when the task is rescheduled on a different executor the job will fail.
> *Example*
> First failure (the block is locally available):
> {noformat}
> 22/04/25 07:15:28 ERROR executor.Executor: Exception in task 17024.0 in stage 
> 12.0 (TID 51853)
> java.io.StreamCorruptedException: invalid stream header: 
>   at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
>   at java.io.ObjectInputStream.(ObjectInputStream.java:401)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
>   at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
>   at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:617)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:897)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> {noformat}
> Then the task might be rescheduled on a different executor but as the block 
> is registered to the first blockmanager the error will be the same:
> {noformat}
> java.io.StreamCorruptedException: invalid stream header: 
>   at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
>   at java.io.ObjectInputStream.(ObjectInputStream.java:401)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
>   at 
> org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:698)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:696)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:696)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:831)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:886)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
> {noformat}
> My idea is to retry the IO operations a few times and when all of them failed 
> deregistering the block and let the following task to recompute it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-38683) It is unnecessary to release the ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the client channel's connection

2022-03-31 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-38683.
--
Fix Version/s: 3.4.0
 Assignee: weixiuli
   Resolution: Fixed

Issue resolved by [https://github.com/apache/spark/pull/36000]

> It is unnecessary to release the ShuffleManagedBufferIterator or 
> ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the 
> client channel's connection is terminated
> --
>
> Key: SPARK-38683
> URL: https://issues.apache.org/jira/browse/SPARK-38683
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1
>Reporter: weixiuli
>Assignee: weixiuli
>Priority: Major
> Fix For: 3.4.0
>
>
>  It is unnecessary to release the ShuffleManagedBufferIterator or 
> ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the 
> client channel's connection is terminated, to reduce I/O operations and 
> improve performance for the External Shuffle Service.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38683) It is unnecessary to release the ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the client channel's connection

2022-03-31 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-38683:
-
Issue Type: Improvement  (was: Bug)

> It is unnecessary to release the ShuffleManagedBufferIterator or 
> ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the 
> client channel's connection is terminated
> --
>
> Key: SPARK-38683
> URL: https://issues.apache.org/jira/browse/SPARK-38683
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1
>Reporter: weixiuli
>Priority: Major
>
>  It is unnecessary to release the ShuffleManagedBufferIterator or 
> ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the 
> client channel's connection is terminated, to reduce I/O operations and 
> improve performance for the External Shuffle Service.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38468) Use error classes in org.apache.spark.metrics

2022-03-15 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17506754#comment-17506754
 ] 

wuyi commented on SPARK-38468:
--

Shall we close this one? Seem it's duplicate with 
https://issues.apache.org/jira/browse/SPARK-38312. cc [~bozhang] [~cloud_fan] 

> Use error classes in org.apache.spark.metrics
> -
>
> Key: SPARK-38468
> URL: https://issues.apache.org/jira/browse/SPARK-38468
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Bo Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37481) Disappearance of skipped stages mislead the bug hunting

2022-03-14 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17506673#comment-17506673
 ] 

wuyi commented on SPARK-37481:
--

Backport fix to 3.1/3.0 also done.
 

> Disappearance of skipped stages mislead the bug hunting 
> 
>
> Key: SPARK-37481
> URL: https://issues.apache.org/jira/browse/SPARK-37481
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.2, 3.2.0, 3.3.0
>Reporter: Kent Yao
>Assignee: Kent Yao
>Priority: Major
> Fix For: 3.2.1, 3.3.0
>
>
> # 
>  ## With FetchFailedException and Map Stage Retries
> When rerunning spark-sql shell with the original SQL in 
> [https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c#gistcomment-3977315]
> !https://user-images.githubusercontent.com/8326978/143821530-ff498caa-abce-483d-a24b-315aacf7e0a0.png!
> 1. stage 3 threw FetchFailedException and caused itself and its parent 
> stage(stage 2) to retry
> 2. stage 2 was skipped before but its attemptId was still 0, so when its 
> retry happened it got removed from `Skipped Stages` 
> The DAG of Job 2 doesn't show that stage 2 is skipped anymore.
> !https://user-images.githubusercontent.com/8326978/143824666-6390b64a-a45b-4bc8-b05d-c5abbb28cdef.png!
> Besides, a retried stage usually has a subset of tasks from the original 
> stage. If we mark it as an original one, the metrics might lead us into 
> pitfalls.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-38266) UnresolvedException: Invalid call to dataType on unresolved object caused by GetDateFieldOperations

2022-02-20 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-38266.
--
Resolution: Fixed

> UnresolvedException: Invalid call to dataType on unresolved object caused by 
> GetDateFieldOperations
> ---
>
> Key: SPARK-38266
> URL: https://issues.apache.org/jira/browse/SPARK-38266
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.2
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>
> {code:java}
> test("GetDateFieldOperations should skip unresolved nodes") {
>   withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
> val df = Seq("1644821603").map(i => (i.toInt, i)).toDF("tsInt", "tsStr")
> val df1 = df.select(df("tsStr").cast("timestamp")).as("df1")
> val df2 = df.select(df("tsStr").cast("timestamp")).as("df2")
> df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer")
> val df3 = df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer")
>   .select($"df1.tsStr".as("timeStr")).as("df3")
> // This throws "UnresolvedException: Invalid call to
> // dataType on unresolved object" instead of "AnalysisException: Column 
> 'df1.timeStr' does not exist."
> df3.join(df1, year($"df1.timeStr") === year($"df3.tsStr"))
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-38266) UnresolvedException: Invalid call to dataType on unresolved object caused by GetDateFieldOperations

2022-02-20 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi reassigned SPARK-38266:


Assignee: wuyi

> UnresolvedException: Invalid call to dataType on unresolved object caused by 
> GetDateFieldOperations
> ---
>
> Key: SPARK-38266
> URL: https://issues.apache.org/jira/browse/SPARK-38266
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.2
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>
> {code:java}
> test("GetDateFieldOperations should skip unresolved nodes") {
>   withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
> val df = Seq("1644821603").map(i => (i.toInt, i)).toDF("tsInt", "tsStr")
> val df1 = df.select(df("tsStr").cast("timestamp")).as("df1")
> val df2 = df.select(df("tsStr").cast("timestamp")).as("df2")
> df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer")
> val df3 = df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer")
>   .select($"df1.tsStr".as("timeStr")).as("df3")
> // This throws "UnresolvedException: Invalid call to
> // dataType on unresolved object" instead of "AnalysisException: Column 
> 'df1.timeStr' does not exist."
> df3.join(df1, year($"df1.timeStr") === year($"df3.tsStr"))
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38266) UnresolvedException: Invalid call to dataType on unresolved object caused by GetDateFieldOperations

2022-02-20 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495285#comment-17495285
 ] 

wuyi commented on SPARK-38266:
--

Issue resolved by https://github.com/apache/spark/pull/35568

> UnresolvedException: Invalid call to dataType on unresolved object caused by 
> GetDateFieldOperations
> ---
>
> Key: SPARK-38266
> URL: https://issues.apache.org/jira/browse/SPARK-38266
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.2
>Reporter: wuyi
>Priority: Major
>
> {code:java}
> test("GetDateFieldOperations should skip unresolved nodes") {
>   withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
> val df = Seq("1644821603").map(i => (i.toInt, i)).toDF("tsInt", "tsStr")
> val df1 = df.select(df("tsStr").cast("timestamp")).as("df1")
> val df2 = df.select(df("tsStr").cast("timestamp")).as("df2")
> df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer")
> val df3 = df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer")
>   .select($"df1.tsStr".as("timeStr")).as("df3")
> // This throws "UnresolvedException: Invalid call to
> // dataType on unresolved object" instead of "AnalysisException: Column 
> 'df1.timeStr' does not exist."
> df3.join(df1, year($"df1.timeStr") === year($"df3.tsStr"))
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38266) UnresolvedException: Invalid call to dataType on unresolved object caused by GetDateFieldOperations

2022-02-20 Thread wuyi (Jira)
wuyi created SPARK-38266:


 Summary: UnresolvedException: Invalid call to dataType on 
unresolved object caused by GetDateFieldOperations
 Key: SPARK-38266
 URL: https://issues.apache.org/jira/browse/SPARK-38266
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2, 3.3.0
Reporter: wuyi


{code:java}
test("GetDateFieldOperations should skip unresolved nodes") {
  withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
val df = Seq("1644821603").map(i => (i.toInt, i)).toDF("tsInt", "tsStr")
val df1 = df.select(df("tsStr").cast("timestamp")).as("df1")
val df2 = df.select(df("tsStr").cast("timestamp")).as("df2")
df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer")
val df3 = df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer")
  .select($"df1.tsStr".as("timeStr")).as("df3")
// This throws "UnresolvedException: Invalid call to
// dataType on unresolved object" instead of "AnalysisException: Column 
'df1.timeStr' does not exist."
df3.join(df1, year($"df1.timeStr") === year($"df3.tsStr"))
  }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-37580) Optimize current TaskSetManager abort logic when task failed count reach the threshold

2022-01-18 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-37580.
--
  Assignee: wangshengjie
Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/34834

> Optimize current TaskSetManager abort logic when task failed count reach the 
> threshold
> --
>
> Key: SPARK-37580
> URL: https://issues.apache.org/jira/browse/SPARK-37580
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: wangshengjie
>Assignee: wangshengjie
>Priority: Major
>
> In production environment, we found some logic leak about TaskSetManager 
> abort. For example:
> If one task has failed 3 times(max failed threshold is 4 in default), and 
> there is a retry task and speculative task both in running state, then one of 
> these 2 task attempts succeed and to cancel another. But executor which task 
> need to be cancelled lost(oom in our situcation), this task marked as failed, 
> and TaskSetManager handle this failed task attempt, it has failed 4 times so 
> abort this stage and cause job failed.
> I created the patch for this bug and will soon be sent the pull request.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-37695) Skip diagnosis ob merged blocks from push-based shuffle

2021-12-22 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-37695.
--
Fix Version/s: 3.2.1
   3.3.0
 Assignee: Cheng Pan
   Resolution: Fixed

Issue resolved by [https://github.com/apache/spark/pull/34961]

> Skip diagnosis ob merged blocks from push-based shuffle
> ---
>
> Key: SPARK-37695
> URL: https://issues.apache.org/jira/browse/SPARK-37695
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Assignee: Cheng Pan
>Priority: Major
> Fix For: 3.2.1, 3.3.0
>
>
> Shuffle corruption diagnosis for push-based shuffle hasn't been supported 
> yet. So we should skip diagnosis on merged blocks, otherwise it could fail:
> {code:java}
> 21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 
> 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion 
> failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
>   at scala.Predef$.assert(Predef.scala:223)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
>   at 
> org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
>   at scala.Option.map(Option.scala:230)
>   at 
> org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
>   at 
> org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
>   at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
>   at java.io.DataInputStream.readInt(DataInputStream.java:387)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.(UnsafeRowSerializer.scala:120)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
>   at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
>   at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>   at org.apache.spark.scheduler.Task.run(Task.scala:136)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at 

[jira] [Updated] (SPARK-37695) Skip diagnosis ob merged blocks from push-based shuffle

2021-12-20 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-37695:
-
Description: 
Shuffle 
{code:java}
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 
138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: 
Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
at scala.Predef$.assert(Predef.scala:223)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
at 
org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
at 
org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.(UnsafeRowSerializer.scala:120)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) {code}
 

 

  was:
 
{code:java}
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 
138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: 
Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
at scala.Predef$.assert(Predef.scala:223)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
at 
org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
at 

[jira] [Updated] (SPARK-37695) Skip diagnosis ob merged blocks from push-based shuffle

2021-12-20 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-37695:
-
Description: 
Shuffle corruption diagnosis for push-based shuffle hasn't been supported yet. 
So we should skip diagnosis on merged blocks, otherwise it could fail:
{code:java}
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 
138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: 
Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
at scala.Predef$.assert(Predef.scala:223)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
at 
org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
at 
org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.(UnsafeRowSerializer.scala:120)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) {code}
 

 

  was:
Shuffle 
{code:java}
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 
138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: 
Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
at scala.Predef$.assert(Predef.scala:223)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
at 
org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
at scala.Option.map(Option.scala:230)
at 

[jira] [Created] (SPARK-37695) Skip diagnosis ob merged blocks from push-based shuffle

2021-12-20 Thread wuyi (Jira)
wuyi created SPARK-37695:


 Summary: Skip diagnosis ob merged blocks from push-based shuffle
 Key: SPARK-37695
 URL: https://issues.apache.org/jira/browse/SPARK-37695
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.2.0, 3.3.0
Reporter: wuyi


 
{code:java}
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 
138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: 
Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
at scala.Predef$.assert(Predef.scala:223)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
at 
org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
at 
org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.(UnsafeRowSerializer.scala:120)
at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37060) Report driver status does not handle response from backup masters

2021-12-15 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-37060:
-
Fix Version/s: 3.1.3
   3.2.1

> Report driver status does not handle response from backup masters
> -
>
> Key: SPARK-37060
> URL: https://issues.apache.org/jira/browse/SPARK-37060
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0
>Reporter: Mohamadreza Rostami
>Assignee: Mohamadreza Rostami
>Priority: Critical
> Fix For: 3.1.3, 3.2.1, 3.3.0
>
>
> After an improvement in SPARK-31486, contributor uses 
> 'asyncSendToMasterAndForwardReply' method instead of 
> 'activeMasterEndpoint.askSync' to get the status of driver. Since the 
> driver's status is only available in active master and the 
> 'asyncSendToMasterAndForwardReply' method iterate over all of the masters, we 
> have to handle the response from the backup masters in the client, which the 
> developer did not consider in the SPARK-31486 change. So drivers running in 
> cluster mode and on a cluster with multi-master affected by this bug. I 
> created the patch for this bug and will soon be sent the pull request.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-37060) Report driver status does not handle response from backup masters

2021-12-15 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi reassigned SPARK-37060:


Assignee: Mohamadreza Rostami

> Report driver status does not handle response from backup masters
> -
>
> Key: SPARK-37060
> URL: https://issues.apache.org/jira/browse/SPARK-37060
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0
>Reporter: Mohamadreza Rostami
>Assignee: Mohamadreza Rostami
>Priority: Critical
>
> After an improvement in SPARK-31486, contributor uses 
> 'asyncSendToMasterAndForwardReply' method instead of 
> 'activeMasterEndpoint.askSync' to get the status of driver. Since the 
> driver's status is only available in active master and the 
> 'asyncSendToMasterAndForwardReply' method iterate over all of the masters, we 
> have to handle the response from the backup masters in the client, which the 
> developer did not consider in the SPARK-31486 change. So drivers running in 
> cluster mode and on a cluster with multi-master affected by this bug. I 
> created the patch for this bug and will soon be sent the pull request.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-37060) Report driver status does not handle response from backup masters

2021-12-15 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-37060.
--
Fix Version/s: 3.3.0
   Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/34331

> Report driver status does not handle response from backup masters
> -
>
> Key: SPARK-37060
> URL: https://issues.apache.org/jira/browse/SPARK-37060
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0
>Reporter: Mohamadreza Rostami
>Assignee: Mohamadreza Rostami
>Priority: Critical
> Fix For: 3.3.0
>
>
> After an improvement in SPARK-31486, contributor uses 
> 'asyncSendToMasterAndForwardReply' method instead of 
> 'activeMasterEndpoint.askSync' to get the status of driver. Since the 
> driver's status is only available in active master and the 
> 'asyncSendToMasterAndForwardReply' method iterate over all of the masters, we 
> have to handle the response from the backup masters in the client, which the 
> developer did not consider in the SPARK-31486 change. So drivers running in 
> cluster mode and on a cluster with multi-master affected by this bug. I 
> created the patch for this bug and will soon be sent the pull request.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-37300) TaskSchedulerImpl should ignore task finished event if its task was already finished state

2021-12-12 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-37300.
--
Fix Version/s: 3.3.0
 Assignee: hujiahua
   Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/34578

> TaskSchedulerImpl should ignore task finished event if its task was already 
> finished state
> --
>
> Key: SPARK-37300
> URL: https://issues.apache.org/jira/browse/SPARK-37300
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: hujiahua
>Assignee: hujiahua
>Priority: Major
> Fix For: 3.3.0
>
>
> `TaskSchedulerImpl` handle task finished event at `handleSuccessfulTask` and 
> `handleFailedTask` , but in some case the task was already finished state, 
> which we should ignore task finished event.
> Case describe: 
> when a executor finished a task of some stage, the driver will receive a 
> StatusUpdate event to handle it. At the same time the driver found the 
> executor heartbeat timed out, so the dirver also need handle ExecutorLost 
> event simultaneously. There was a race condition issues here, which will make 
> TaskSetManager.successful and TaskSetManager.tasksSuccessful wrong result. 
> More detailed description and discussion can be viewed at 
> https://issues.apache.org/jira/browse/SPARK-36575 and 
> https://github.com/apache/spark/pull/33872



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37481) Disappearance of skipped stages mislead the bug hunting

2021-12-12 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458131#comment-17458131
 ] 

wuyi commented on SPARK-37481:
--

Backport fix to 3.1/3.0 is still in progress.

> Disappearance of skipped stages mislead the bug hunting 
> 
>
> Key: SPARK-37481
> URL: https://issues.apache.org/jira/browse/SPARK-37481
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.2, 3.2.0, 3.3.0
>Reporter: Kent Yao
>Assignee: Kent Yao
>Priority: Major
> Fix For: 3.2.0, 3.3.0
>
>
> # 
>  ## With FetchFailedException and Map Stage Retries
> When rerunning spark-sql shell with the original SQL in 
> [https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c#gistcomment-3977315]
> !https://user-images.githubusercontent.com/8326978/143821530-ff498caa-abce-483d-a24b-315aacf7e0a0.png!
> 1. stage 3 threw FetchFailedException and caused itself and its parent 
> stage(stage 2) to retry
> 2. stage 2 was skipped before but its attemptId was still 0, so when its 
> retry happened it got removed from `Skipped Stages` 
> The DAG of Job 2 doesn't show that stage 2 is skipped anymore.
> !https://user-images.githubusercontent.com/8326978/143824666-6390b64a-a45b-4bc8-b05d-c5abbb28cdef.png!
> Besides, a retried stage usually has a subset of tasks from the original 
> stage. If we mark it as an original one, the metrics might lead us into 
> pitfalls.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-37481) Disappearance of skipped stages mislead the bug hunting

2021-12-12 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-37481.
--
Fix Version/s: 3.3.0
   3.2.0
 Assignee: Kent Yao
   Resolution: Fixed

Issue resolved by https://github.com/apache/spark/pull/34735

> Disappearance of skipped stages mislead the bug hunting 
> 
>
> Key: SPARK-37481
> URL: https://issues.apache.org/jira/browse/SPARK-37481
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.2, 3.2.0, 3.3.0
>Reporter: Kent Yao
>Assignee: Kent Yao
>Priority: Major
> Fix For: 3.3.0, 3.2.0
>
>
> # 
>  ## With FetchFailedException and Map Stage Retries
> When rerunning spark-sql shell with the original SQL in 
> [https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c#gistcomment-3977315]
> !https://user-images.githubusercontent.com/8326978/143821530-ff498caa-abce-483d-a24b-315aacf7e0a0.png!
> 1. stage 3 threw FetchFailedException and caused itself and its parent 
> stage(stage 2) to retry
> 2. stage 2 was skipped before but its attemptId was still 0, so when its 
> retry happened it got removed from `Skipped Stages` 
> The DAG of Job 2 doesn't show that stage 2 is skipped anymore.
> !https://user-images.githubusercontent.com/8326978/143824666-6390b64a-a45b-4bc8-b05d-c5abbb28cdef.png!
> Besides, a retried stage usually has a subset of tasks from the original 
> stage. If we mark it as an original one, the metrics might lead us into 
> pitfalls.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36575) Executor lost may cause spark stage to hang

2021-11-10 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441796#comment-17441796
 ] 

wuyi commented on SPARK-36575:
--

FYI: the fix is reverted due to test issues.

> Executor lost may cause spark stage to hang
> ---
>
> Key: SPARK-36575
> URL: https://issues.apache.org/jira/browse/SPARK-36575
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 2.3.3
>Reporter: hujiahua
>Assignee: hujiahua
>Priority: Major
> Fix For: 3.3.0
>
>
> When a executor finished a task of some stage, the driver will receive a 
> `StatusUpdate` event to handle it. At the same time the driver found the 
> executor heartbeat timed out, so the dirver also need handle ExecutorLost 
> event simultaneously. There was a race condition issues here, which will make 
> the task never been rescheduled again and the stage hang over.
>  The problem is that `TaskResultGetter.enqueueSuccessfulTask` use 
> asynchronous thread to handle successful task, that mean the synchronized 
> lock of `TaskSchedulerImpl` was released prematurely during midway 
> [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61].
>  So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous 
> thread will go on to handle successful task. It cause 
> `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong 
> result. 
> Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, 
> which make `TaskSchedulerImpl.executorLost` was executed twice. 
> `copiesRunning(index) -= 1` were processed in `executorLost`, twice 
> `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. 
> related log when the issue produce: 
>  21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: 
> Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 
> 366724, partition 4004, ANY, 7994 bytes)
>  21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: 
> Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after 
> 140830 ms
>  21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost 
> task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): 
> ExecutorLostFailure (executor 366724 exited caused by one of the running 
> tasks) Reason: Executor heartbeat timed out after 140830 ms
>  21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished 
> task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 
> (executor 366724) (3047/5400)
> 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: 
> Executor 366724 on 10.109.89.3 killed by driver.
>  21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] 
> ExecutorMonitor: Executor 366724 removed (new total is 793)
> 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417416)
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, 
> 10.109.89.3, 43402, None)
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417416)
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417473)
>  21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417473)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36575) Executor lost may cause spark stage to hang

2021-11-09 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi reassigned SPARK-36575:


Assignee: hujiahua

> Executor lost may cause spark stage to hang
> ---
>
> Key: SPARK-36575
> URL: https://issues.apache.org/jira/browse/SPARK-36575
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 2.3.3
>Reporter: hujiahua
>Assignee: hujiahua
>Priority: Major
> Fix For: 3.3.0
>
>
> When a executor finished a task of some stage, the driver will receive a 
> `StatusUpdate` event to handle it. At the same time the driver found the 
> executor heartbeat timed out, so the dirver also need handle ExecutorLost 
> event simultaneously. There was a race condition issues here, which will make 
> the task never been rescheduled again and the stage hang over.
>  The problem is that `TaskResultGetter.enqueueSuccessfulTask` use 
> asynchronous thread to handle successful task, that mean the synchronized 
> lock of `TaskSchedulerImpl` was released prematurely during midway 
> [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61].
>  So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous 
> thread will go on to handle successful task. It cause 
> `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong 
> result. 
> Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, 
> which make `TaskSchedulerImpl.executorLost` was executed twice. 
> `copiesRunning(index) -= 1` were processed in `executorLost`, twice 
> `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. 
> related log when the issue produce: 
>  21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: 
> Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 
> 366724, partition 4004, ANY, 7994 bytes)
>  21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: 
> Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after 
> 140830 ms
>  21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost 
> task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): 
> ExecutorLostFailure (executor 366724 exited caused by one of the running 
> tasks) Reason: Executor heartbeat timed out after 140830 ms
>  21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished 
> task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 
> (executor 366724) (3047/5400)
> 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: 
> Executor 366724 on 10.109.89.3 killed by driver.
>  21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] 
> ExecutorMonitor: Executor 366724 removed (new total is 793)
> 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417416)
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, 
> 10.109.89.3, 43402, None)
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417416)
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417473)
>  21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417473)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36575) Executor lost may cause spark stage to hang

2021-11-09 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi reassigned SPARK-36575:


Assignee: (was: wuyi)

> Executor lost may cause spark stage to hang
> ---
>
> Key: SPARK-36575
> URL: https://issues.apache.org/jira/browse/SPARK-36575
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 2.3.3
>Reporter: hujiahua
>Priority: Major
> Fix For: 3.3.0
>
>
> When a executor finished a task of some stage, the driver will receive a 
> `StatusUpdate` event to handle it. At the same time the driver found the 
> executor heartbeat timed out, so the dirver also need handle ExecutorLost 
> event simultaneously. There was a race condition issues here, which will make 
> the task never been rescheduled again and the stage hang over.
>  The problem is that `TaskResultGetter.enqueueSuccessfulTask` use 
> asynchronous thread to handle successful task, that mean the synchronized 
> lock of `TaskSchedulerImpl` was released prematurely during midway 
> [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61].
>  So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous 
> thread will go on to handle successful task. It cause 
> `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong 
> result. 
> Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, 
> which make `TaskSchedulerImpl.executorLost` was executed twice. 
> `copiesRunning(index) -= 1` were processed in `executorLost`, twice 
> `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. 
> related log when the issue produce: 
>  21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: 
> Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 
> 366724, partition 4004, ANY, 7994 bytes)
>  21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: 
> Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after 
> 140830 ms
>  21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost 
> task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): 
> ExecutorLostFailure (executor 366724 exited caused by one of the running 
> tasks) Reason: Executor heartbeat timed out after 140830 ms
>  21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished 
> task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 
> (executor 366724) (3047/5400)
> 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: 
> Executor 366724 on 10.109.89.3 killed by driver.
>  21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] 
> ExecutorMonitor: Executor 366724 removed (new total is 793)
> 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417416)
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, 
> 10.109.89.3, 43402, None)
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417416)
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417473)
>  21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417473)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36575) Executor lost may cause spark stage to hang

2021-11-09 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441486#comment-17441486
 ] 

wuyi commented on SPARK-36575:
--

Issue resolved by [https://github.com/apache/spark/pull/33872.]

 

To clarify, the fix doesn't really fix a hang issue but an improvement. The 
hanging issue doesn't exist in Master branch but only 2.3 (is confirmed)

> Executor lost may cause spark stage to hang
> ---
>
> Key: SPARK-36575
> URL: https://issues.apache.org/jira/browse/SPARK-36575
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.3.3
>Reporter: hujiahua
>Assignee: wuyi
>Priority: Major
>
> When a executor finished a task of some stage, the driver will receive a 
> `StatusUpdate` event to handle it. At the same time the driver found the 
> executor heartbeat timed out, so the dirver also need handle ExecutorLost 
> event simultaneously. There was a race condition issues here, which will make 
> the task never been rescheduled again and the stage hang over.
>  The problem is that `TaskResultGetter.enqueueSuccessfulTask` use 
> asynchronous thread to handle successful task, that mean the synchronized 
> lock of `TaskSchedulerImpl` was released prematurely during midway 
> [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61].
>  So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous 
> thread will go on to handle successful task. It cause 
> `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong 
> result. 
> Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, 
> which make `TaskSchedulerImpl.executorLost` was executed twice. 
> `copiesRunning(index) -= 1` were processed in `executorLost`, twice 
> `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. 
> related log when the issue produce: 
>  21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: 
> Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 
> 366724, partition 4004, ANY, 7994 bytes)
>  21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: 
> Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after 
> 140830 ms
>  21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost 
> task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): 
> ExecutorLostFailure (executor 366724 exited caused by one of the running 
> tasks) Reason: Executor heartbeat timed out after 140830 ms
>  21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished 
> task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 
> (executor 366724) (3047/5400)
> 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: 
> Executor 366724 on 10.109.89.3 killed by driver.
>  21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] 
> ExecutorMonitor: Executor 366724 removed (new total is 793)
> 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417416)
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, 
> 10.109.89.3, 43402, None)
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417416)
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417473)
>  21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417473)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36575) Executor lost may cause spark stage to hang

2021-11-09 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-36575:
-
Issue Type: Improvement  (was: Bug)

> Executor lost may cause spark stage to hang
> ---
>
> Key: SPARK-36575
> URL: https://issues.apache.org/jira/browse/SPARK-36575
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 2.3.3
>Reporter: hujiahua
>Assignee: wuyi
>Priority: Major
>
> When a executor finished a task of some stage, the driver will receive a 
> `StatusUpdate` event to handle it. At the same time the driver found the 
> executor heartbeat timed out, so the dirver also need handle ExecutorLost 
> event simultaneously. There was a race condition issues here, which will make 
> the task never been rescheduled again and the stage hang over.
>  The problem is that `TaskResultGetter.enqueueSuccessfulTask` use 
> asynchronous thread to handle successful task, that mean the synchronized 
> lock of `TaskSchedulerImpl` was released prematurely during midway 
> [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61].
>  So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous 
> thread will go on to handle successful task. It cause 
> `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong 
> result. 
> Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, 
> which make `TaskSchedulerImpl.executorLost` was executed twice. 
> `copiesRunning(index) -= 1` were processed in `executorLost`, twice 
> `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. 
> related log when the issue produce: 
>  21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: 
> Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 
> 366724, partition 4004, ANY, 7994 bytes)
>  21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: 
> Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after 
> 140830 ms
>  21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost 
> task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): 
> ExecutorLostFailure (executor 366724 exited caused by one of the running 
> tasks) Reason: Executor heartbeat timed out after 140830 ms
>  21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished 
> task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 
> (executor 366724) (3047/5400)
> 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: 
> Executor 366724 on 10.109.89.3 killed by driver.
>  21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] 
> ExecutorMonitor: Executor 366724 removed (new total is 793)
> 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417416)
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, 
> 10.109.89.3, 43402, None)
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417416)
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417473)
>  21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417473)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36575) Executor lost may cause spark stage to hang

2021-11-09 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-36575:
-
Fix Version/s: 3.3.0

> Executor lost may cause spark stage to hang
> ---
>
> Key: SPARK-36575
> URL: https://issues.apache.org/jira/browse/SPARK-36575
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 2.3.3
>Reporter: hujiahua
>Assignee: wuyi
>Priority: Major
> Fix For: 3.3.0
>
>
> When a executor finished a task of some stage, the driver will receive a 
> `StatusUpdate` event to handle it. At the same time the driver found the 
> executor heartbeat timed out, so the dirver also need handle ExecutorLost 
> event simultaneously. There was a race condition issues here, which will make 
> the task never been rescheduled again and the stage hang over.
>  The problem is that `TaskResultGetter.enqueueSuccessfulTask` use 
> asynchronous thread to handle successful task, that mean the synchronized 
> lock of `TaskSchedulerImpl` was released prematurely during midway 
> [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61].
>  So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous 
> thread will go on to handle successful task. It cause 
> `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong 
> result. 
> Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, 
> which make `TaskSchedulerImpl.executorLost` was executed twice. 
> `copiesRunning(index) -= 1` were processed in `executorLost`, twice 
> `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. 
> related log when the issue produce: 
>  21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: 
> Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 
> 366724, partition 4004, ANY, 7994 bytes)
>  21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: 
> Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after 
> 140830 ms
>  21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost 
> task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): 
> ExecutorLostFailure (executor 366724 exited caused by one of the running 
> tasks) Reason: Executor heartbeat timed out after 140830 ms
>  21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished 
> task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 
> (executor 366724) (3047/5400)
> 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: 
> Executor 366724 on 10.109.89.3 killed by driver.
>  21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] 
> ExecutorMonitor: Executor 366724 removed (new total is 793)
> 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417416)
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, 
> 10.109.89.3, 43402, None)
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417416)
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417473)
>  21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417473)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36575) Executor lost may cause spark stage to hang

2021-11-09 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi reassigned SPARK-36575:


Assignee: wuyi

> Executor lost may cause spark stage to hang
> ---
>
> Key: SPARK-36575
> URL: https://issues.apache.org/jira/browse/SPARK-36575
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.3.3
>Reporter: hujiahua
>Assignee: wuyi
>Priority: Major
>
> When a executor finished a task of some stage, the driver will receive a 
> `StatusUpdate` event to handle it. At the same time the driver found the 
> executor heartbeat timed out, so the dirver also need handle ExecutorLost 
> event simultaneously. There was a race condition issues here, which will make 
> the task never been rescheduled again and the stage hang over.
>  The problem is that `TaskResultGetter.enqueueSuccessfulTask` use 
> asynchronous thread to handle successful task, that mean the synchronized 
> lock of `TaskSchedulerImpl` was released prematurely during midway 
> [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61].
>  So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous 
> thread will go on to handle successful task. It cause 
> `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong 
> result. 
> Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, 
> which make `TaskSchedulerImpl.executorLost` was executed twice. 
> `copiesRunning(index) -= 1` were processed in `executorLost`, twice 
> `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. 
> related log when the issue produce: 
>  21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: 
> Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 
> 366724, partition 4004, ANY, 7994 bytes)
>  21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: 
> Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after 
> 140830 ms
>  21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost 
> task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): 
> ExecutorLostFailure (executor 366724 exited caused by one of the running 
> tasks) Reason: Executor heartbeat timed out after 140830 ms
>  21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished 
> task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 
> (executor 366724) (3047/5400)
> 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: 
> Executor 366724 on 10.109.89.3 killed by driver.
>  21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] 
> ExecutorMonitor: Executor 366724 removed (new total is 793)
> 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417416)
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, 
> 10.109.89.3, 43402, None)
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417416)
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417473)
>  21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417473)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36575) Executor lost may cause spark stage to hang

2021-11-09 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-36575.
--
Resolution: Fixed

> Executor lost may cause spark stage to hang
> ---
>
> Key: SPARK-36575
> URL: https://issues.apache.org/jira/browse/SPARK-36575
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.3.3
>Reporter: hujiahua
>Priority: Major
>
> When a executor finished a task of some stage, the driver will receive a 
> `StatusUpdate` event to handle it. At the same time the driver found the 
> executor heartbeat timed out, so the dirver also need handle ExecutorLost 
> event simultaneously. There was a race condition issues here, which will make 
> the task never been rescheduled again and the stage hang over.
>  The problem is that `TaskResultGetter.enqueueSuccessfulTask` use 
> asynchronous thread to handle successful task, that mean the synchronized 
> lock of `TaskSchedulerImpl` was released prematurely during midway 
> [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61].
>  So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous 
> thread will go on to handle successful task. It cause 
> `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong 
> result. 
> Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, 
> which make `TaskSchedulerImpl.executorLost` was executed twice. 
> `copiesRunning(index) -= 1` were processed in `executorLost`, twice 
> `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. 
> related log when the issue produce: 
>  21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: 
> Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 
> 366724, partition 4004, ANY, 7994 bytes)
>  21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: 
> Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after 
> 140830 ms
>  21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost 
> task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): 
> ExecutorLostFailure (executor 366724 exited caused by one of the running 
> tasks) Reason: Executor heartbeat timed out after 140830 ms
>  21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished 
> task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 
> (executor 366724) (3047/5400)
> 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: 
> Executor 366724 on 10.109.89.3 killed by driver.
>  21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] 
> ExecutorMonitor: Executor 366724 removed (new total is 793)
> 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417416)
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] 
> BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, 
> 10.109.89.3, 43402, None)
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417416)
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor 
> lost: 366724 (epoch 417473)
>  21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] 
> BlockManagerMasterEndpoint: Trying to remove executor 366724 from 
> BlockManagerMaster.
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: 
> Removed 366724 successfully in removeExecutor
>  21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle 
> files lost for executor: 366724 (epoch 417473)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-09-29 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17421971#comment-17421971
 ] 

wuyi commented on SPARK-18105:
--

[~vladimir.prus] Hi, could you also file a sub-task under 
https://issues.apache.org/jira/browse/SPARK-20624? That sounds like an issue in 
decommission.

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager

2021-09-12 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-36700.
--
Fix Version/s: 3.3.0
   3.0.4
   3.1.3
   3.2.0
 Assignee: wuyi
   Resolution: Fixed

> BlockManager re-registration is broken due to deferred removal of 
> BlockManager 
> ---
>
> Key: SPARK-36700
> URL: https://issues.apache.org/jira/browse/SPARK-36700
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Blocker
> Fix For: 3.2.0, 3.1.3, 3.0.4, 3.3.0
>
>
> Due to the deferred removal of BlockManager (introduced in SPARK-35011), an 
> expected BlockManager re-registration could be refused as the inactive 
> BlockManager still exists in the map `blockManagerInfo`:
> https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager

2021-09-12 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413865#comment-17413865
 ] 

wuyi commented on SPARK-36700:
--

Reverted by [https://github.com/apache/spark/pull/33942] and backported to 3.2, 
3.1, 3.0.

> BlockManager re-registration is broken due to deferred removal of 
> BlockManager 
> ---
>
> Key: SPARK-36700
> URL: https://issues.apache.org/jira/browse/SPARK-36700
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
> Due to the deferred removal of BlockManager (introduced in SPARK-35011), an 
> expected BlockManager re-registration could be refused as the inactive 
> BlockManager still exists in the map `blockManagerInfo`:
> https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager

2021-09-08 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412290#comment-17412290
 ] 

wuyi commented on SPARK-36700:
--

I'm working on the fix.

> BlockManager re-registration is broken due to deferred removal of 
> BlockManager 
> ---
>
> Key: SPARK-36700
> URL: https://issues.apache.org/jira/browse/SPARK-36700
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
> Due to the deferred removal of BlockManager (introduced in SPARK-35011), an 
> expected BlockManager re-registration could be refused as the inactive 
> BlockManager still exists in the map `blockManagerInfo`:
> https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager

2021-09-08 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412285#comment-17412285
 ] 

wuyi commented on SPARK-36700:
--

cc [~mridulm80] [~sumeet.gajjar] [~attilapiros]

cc [~gengliang] for the blocker fyi

> BlockManager re-registration is broken due to deferred removal of 
> BlockManager 
> ---
>
> Key: SPARK-36700
> URL: https://issues.apache.org/jira/browse/SPARK-36700
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
> Due to the deferred removal of BlockManager (introduced in SPARK-35011), an 
> expected BlockManager re-registration could be refused as the inactive 
> BlockManager still exists in the map `blockManagerInfo`:
> https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager

2021-09-08 Thread wuyi (Jira)
wuyi created SPARK-36700:


 Summary: BlockManager re-registration is broken due to deferred 
removal of BlockManager 
 Key: SPARK-36700
 URL: https://issues.apache.org/jira/browse/SPARK-36700
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.1.2, 3.0.3, 3.2.0, 3.3.0
Reporter: wuyi


Due to the deferred removal of BlockManager (introduced in SPARK-35011), an 
expected BlockManager re-registration could be refused as the inactive 
BlockManager still exists in the map `blockManagerInfo`:

https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36614) Executor loss reason shows "worker lost" rather "Executor decommission"

2021-08-30 Thread wuyi (Jira)
wuyi created SPARK-36614:


 Summary: Executor loss reason shows "worker lost" rather "Executor 
decommission"
 Key: SPARK-36614
 URL: https://issues.apache.org/jira/browse/SPARK-36614
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 3.1.2, 3.2.0, 3.3.0
Reporter: wuyi
 Attachments: WeChat13c9f1345a096ff83d193e4e9853b165.png

For a lost executor caused by decommission, the loss reason shows "worker 
lost", while it is actually due to decommission.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36614) Executor loss reason shows "worker lost" rather "Executor decommission"

2021-08-30 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-36614:
-
Attachment: WeChat13c9f1345a096ff83d193e4e9853b165.png

> Executor loss reason shows "worker lost" rather "Executor decommission"
> ---
>
> Key: SPARK-36614
> URL: https://issues.apache.org/jira/browse/SPARK-36614
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.1.2, 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Major
> Attachments: WeChat13c9f1345a096ff83d193e4e9853b165.png
>
>
> For a lost executor caused by decommission, the loss reason shows "worker 
> lost", while it is actually due to decommission.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-30 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406622#comment-17406622
 ] 

wuyi edited comment on SPARK-18105 at 8/30/21, 8:31 AM:


FYI, for users who hit the "Stream is corrupted" error in 3.0.0, please try to 
apply the fix of SPARK-32658 first as it's a known bug that can cause the error.


was (Author: ngone51):
FYI, for users who hit the "Stream is corrupted" error, please try to apply the 
fix of SPARK-32658 first as it's a known bug that can cause the error.

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-30 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406622#comment-17406622
 ] 

wuyi commented on SPARK-18105:
--

FYI, for users who hit the "Stream is corrupted" error, please try to apply the 
fix of SPARK-32658 first as it's a known bug that can cause the error.

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36196) Spark FetchFailedException Stream is corrupted Error

2021-08-30 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406620#comment-17406620
 ] 

wuyi commented on SPARK-36196:
--

Hi [~arghya18] Did you try to apply the fix of 
https://issues.apache.org/jira/browse/SPARK-32658?

> Spark FetchFailedException Stream is corrupted Error
> 
>
> Key: SPARK-36196
> URL: https://issues.apache.org/jira/browse/SPARK-36196
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes, PySpark, Spark Core
>Affects Versions: 3.1.1
> Environment: Spark on K8s
>Reporter: Arghya Saha
>Priority: Major
>
> I am running Spark on K8S. There are around thousands of jobs runs everyday 
> but few are getting failed everyday(not same job) and with below exception. 
> It succeed on retry. I have read about the error in multiple Jira and saw its 
> resolved with Spark 3.0.0 but I am still getting the error with higher 
> version.
> {code:java}
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
> org.apache.spark.shuffle.FetchFailedException: Stream is corrupted at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:770)
>  at 
> org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845)
>  at java.base/java.io.BufferedInputStream.fill(Unknown Source) at 
> java.base/java.io.BufferedInputStream.read1(Unknown Source) at 
> java.base/java.io.BufferedInputStream.read(Unknown Source) at 
> java.base/java.io.DataInputStream.read(Unknown Source) at 
> org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899) at 
> org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733) at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
>  at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:494) at 
> scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) 
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.sort_addToSorter_0$(Unknown
>  Source) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
>  Source) at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>  at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>  at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:817)
>  at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.(SortMergeJoinExec.scala:687)
>  at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:197)
>  at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.run(Task.scala:131) at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source) at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> java.io.IOException: Stream is corrupted at 
> net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:250) at 
> net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at 
> org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:841)
>  ... 38 moreCaused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 

[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-24 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403556#comment-17403556
 ] 

wuyi commented on SPARK-36558:
--

Discussed with [~vsowrirajan] offline. The issue won't happen in the production 
code since the "runningStages" is synced within the single 
DAGSchedulerEventProcessLoop' thread.  Close as won't fix.

> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is considered as a "missing" 
> stage (see 
> [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
>  And it breaks the original assumption that a "missing" stage must have tasks 
> to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have 
> finished in stage A, stage A will be skipped directly when submitting stage 
> B. However, with this bug, stage A will be submitted. And submitting a stage 
> with no tasks to run would not be able to add its child stage into the 
> waiting stage list, which leads to the job hang in the end.
>  
> The example to reproduce:
> First, change `MyRDD` to allow it to compute:
> {code:java}
> override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
> Int)] = {
>    Iterator.single((1, 1))
>  }{code}
>  Then run this test:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
> "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
> sc,
> sc.dagScheduler.taskScheduler,
> sc.listenerBus,
> sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
> sc.env.blockManager.master,
> sc.env) {
> override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = 
> {
>   // By this, we can mimic a stage with all tasks finished
>   // but finalization is incomplete.
>   latch.countDown()
> }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 20
>   val shuffleMapRdd = new MyRDD(sc, parts, Nil)
>   val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(parts))
>   val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
> mapOutputTracker)
>   reduceRdd1.countAsync()
>   latch.await()
>   // scalastyle:off
>   println("=after wait==")
>   // set _shuffleMergedFinalized to true can avoid the hang.
>   // shuffleDep._shuffleMergedFinalized = true
>   val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
>   reduceRdd2.count()
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-24 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-36558.
--
Resolution: Won't Fix

> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is considered as a "missing" 
> stage (see 
> [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
>  And it breaks the original assumption that a "missing" stage must have tasks 
> to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have 
> finished in stage A, stage A will be skipped directly when submitting stage 
> B. However, with this bug, stage A will be submitted. And submitting a stage 
> with no tasks to run would not be able to add its child stage into the 
> waiting stage list, which leads to the job hang in the end.
>  
> The example to reproduce:
> First, change `MyRDD` to allow it to compute:
> {code:java}
> override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
> Int)] = {
>    Iterator.single((1, 1))
>  }{code}
>  Then run this test:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
> "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
> sc,
> sc.dagScheduler.taskScheduler,
> sc.listenerBus,
> sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
> sc.env.blockManager.master,
> sc.env) {
> override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = 
> {
>   // By this, we can mimic a stage with all tasks finished
>   // but finalization is incomplete.
>   latch.countDown()
> }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 20
>   val shuffleMapRdd = new MyRDD(sc, parts, Nil)
>   val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(parts))
>   val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
> mapOutputTracker)
>   reduceRdd1.countAsync()
>   latch.await()
>   // scalastyle:off
>   println("=after wait==")
>   // set _shuffleMergedFinalized to true can avoid the hang.
>   // shuffleDep._shuffleMergedFinalized = true
>   val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
>   reduceRdd2.count()
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-23 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-36558:
-
Description: 
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted. And submitting a stage with 
no tasks to run would not be able to add its child stage into the waiting stage 
list, which leads to the job hang in the end.

 

The example to reproduce:

First, change `MyRDD` to allow it to compute:
{code:java}
override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
Int)] = {
   Iterator.single((1, 1))
 }{code}

 Then run this test:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // scalastyle:off
  println("=after wait==")
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 

  was:
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted. And submitting a stage with 
no tasks to run would not be able to add its child stage into the waiting stage 
list, which leads to the job hang in the end.

 

The example to reproduce:

First, change `MyRDD` to allow it compute:
override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
Int)] = \{
  Iterator.single((1, 1))
}
Then run this test:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // scalastyle:off
  println("=after wait==")
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 


> Stage has all tasks finished but with ongoing finalization can cause job hang
> 

[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-23 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-36558:
-
Description: 
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted. And submitting a stage with 
no tasks to run would not be able to add its child stage into the waiting stage 
list, which leads to the job hang in the end.

 

The example to reproduce:

First, change `MyRDD` to allow it compute:
override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
Int)] = \{
  Iterator.single((1, 1))
}
Then run this test:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // scalastyle:off
  println("=after wait==")
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 

  was:
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted. And submitting a stage with 
no tasks to run would not be able to add its child stage into the waiting stage 
list, which leads to the job hang in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // scalastyle:off
  println("=after wait==")
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 


> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> 

[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-23 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-36558:
-
Description: 
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted. And submitting a stage with 
no tasks to run would not be able to add its child stage into the waiting stage 
list, which leads to the job hang in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // scalastyle:off
  println("=after wait==")
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 

  was:
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted. And submitting a stage with 
no tasks to run would not be able to add its child stage into the waiting stage 
list, which leads to the job hang in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 


> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing 

[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-23 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403456#comment-17403456
 ] 

wuyi commented on SPARK-36558:
--

[~vsowrirajan] Sorry, I missed one tweaked change in `MyRDD`. We should allow 
`MyRDD` to compute:
{code:java}
override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
Int)] = {
  Iterator.single((1, 1))
}{code}
Could you help verify it again?

> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is considered as a "missing" 
> stage (see 
> [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
>  And it breaks the original assumption that a "missing" stage must have tasks 
> to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have 
> finished in stage A, stage A will be skipped directly when submitting stage 
> B. However, with this bug, stage A will be submitted. And submitting a stage 
> with no tasks to run would not be able to add its child stage into the 
> waiting stage list, which leads to the job hang in the end.
>  
> The example to reproduce:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
> "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
> sc,
> sc.dagScheduler.taskScheduler,
> sc.listenerBus,
> sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
> sc.env.blockManager.master,
> sc.env) {
> override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = 
> {
>   // By this, we can mimic a stage with all tasks finished
>   // but finalization is incomplete.
>   latch.countDown()
> }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 20
>   val shuffleMapRdd = new MyRDD(sc, parts, Nil)
>   val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(parts))
>   val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
> mapOutputTracker)
>   reduceRdd1.countAsync()
>   latch.await()
>   // set _shuffleMergedFinalized to true can avoid the hang.
>   // shuffleDep._shuffleMergedFinalized = true
>   val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
>   reduceRdd2.count()
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36564) LiveRDDDistribution.toApi throws NullPointerException

2021-08-23 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-36564:
-
Summary: LiveRDDDistribution.toApi throws NullPointerException  (was: 
LiveRDD.doUpdate throws NullPointerException)

> LiveRDDDistribution.toApi throws NullPointerException
> -
>
> Key: SPARK-36564
> URL: https://issues.apache.org/jira/browse/SPARK-36564
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Major
>
> {code:java}
> 21/08/23 12:26:29 ERROR AsyncEventQueue: Listener AppStatusListener threw an 
> exception
> java.lang.NullPointerException
>   at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:192)
>   at 
> com.google.common.collect.MapMakerInternalMap.putIfAbsent(MapMakerInternalMap.java:3507)
>   at 
> com.google.common.collect.Interners$WeakInterner.intern(Interners.java:85)
>   at 
> org.apache.spark.status.LiveEntityHelpers$.weakIntern(LiveEntity.scala:696)
>   at 
> org.apache.spark.status.LiveRDDDistribution.toApi(LiveEntity.scala:563)
>   at 
> org.apache.spark.status.LiveRDD.$anonfun$doUpdate$4(LiveEntity.scala:629)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at 
> scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158)
>   at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>   at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
>   at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at org.apache.spark.status.LiveRDD.doUpdate(LiveEntity.scala:629)
>   at org.apache.spark.status.LiveEntity.write(LiveEntity.scala:51)
>   at 
> org.apache.spark.status.AppStatusListener.update(AppStatusListener.scala:1206)
>   at 
> org.apache.spark.status.AppStatusListener.maybeUpdate(AppStatusListener.scala:1212)
>   at 
> org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6(AppStatusListener.scala:956)
>   at 
> org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6$adapted(AppStatusListener.scala:956)
>   at 
> scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158)
>   at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>   at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
>   at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158)
>   at 
> org.apache.spark.status.AppStatusListener.flush(AppStatusListener.scala:1015)
>   at 
> org.apache.spark.status.AppStatusListener.onExecutorMetricsUpdate(AppStatusListener.scala:956)
>   at 
> org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:59)
>   at 
> org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
>   at 
> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
>   at 
> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
>   at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:119)
>   at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:103)
>   at 
> org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
>   at 
> org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
>   at 
> scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
>   at 
> org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
>   at 
> org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
>   at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1585)
>   at 
> org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36564) LiveRDD.doUpdate throws NullPointerException

2021-08-23 Thread wuyi (Jira)
wuyi created SPARK-36564:


 Summary: LiveRDD.doUpdate throws NullPointerException
 Key: SPARK-36564
 URL: https://issues.apache.org/jira/browse/SPARK-36564
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.1.2, 3.0.3, 3.2.0, 3.3.0
Reporter: wuyi


{code:java}
21/08/23 12:26:29 ERROR AsyncEventQueue: Listener AppStatusListener threw an 
exception
java.lang.NullPointerException
at 
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:192)
at 
com.google.common.collect.MapMakerInternalMap.putIfAbsent(MapMakerInternalMap.java:3507)
at 
com.google.common.collect.Interners$WeakInterner.intern(Interners.java:85)
at 
org.apache.spark.status.LiveEntityHelpers$.weakIntern(LiveEntity.scala:696)
at 
org.apache.spark.status.LiveRDDDistribution.toApi(LiveEntity.scala:563)
at 
org.apache.spark.status.LiveRDD.$anonfun$doUpdate$4(LiveEntity.scala:629)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at 
scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.status.LiveRDD.doUpdate(LiveEntity.scala:629)
at org.apache.spark.status.LiveEntity.write(LiveEntity.scala:51)
at 
org.apache.spark.status.AppStatusListener.update(AppStatusListener.scala:1206)
at 
org.apache.spark.status.AppStatusListener.maybeUpdate(AppStatusListener.scala:1212)
at 
org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6(AppStatusListener.scala:956)
at 
org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6$adapted(AppStatusListener.scala:956)
at 
scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158)
at 
org.apache.spark.status.AppStatusListener.flush(AppStatusListener.scala:1015)
at 
org.apache.spark.status.AppStatusListener.onExecutorMetricsUpdate(AppStatusListener.scala:956)
at 
org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:59)
at 
org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:119)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:103)
at 
org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at 
org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at 
scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at 
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1585)
at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-22 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-36558:
-
Description: 
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted. And submitting a stage with 
no tasks to run would not be able to add its child stage into the waiting stage 
list, which leads to the job hang in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 

  was:
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted. And submitting a stage with 
no tasks to run would not be able to add child stage into the waiting stage 
list, which leads to the job hang in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 


> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is 

[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-22 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-36558:
-
Description: 
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted. And submitting a stage with 
no tasks to run would not be able to add child stage into the waiting stage 
list, which leads to the job hang in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 

  was:
 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted, which leads to the job hang 
in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 


> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is considered as a "missing" 
> stage (see 
> 

[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-22 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402942#comment-17402942
 ] 

wuyi commented on SPARK-36558:
--

cc [~mridul] [~mshen] [~csingh]

> Stage has all tasks finished but with ongoing finalization can cause job hang
> -
>
> Key: SPARK-36558
> URL: https://issues.apache.org/jira/browse/SPARK-36558
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can 
> lead to job hang. The problem is that such stage is considered as a "missing" 
> stage (see 
> [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
>  And it breaks the original assumption that a "missing" stage must have tasks 
> to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have 
> finished in stage A, stage A will be skipped directly when submitting stage 
> B. However, with this bug, stage A will be submitted, which leads to the job 
> hang in the end.
>  
> The example to reproduce:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
> "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
> sc,
> sc.dagScheduler.taskScheduler,
> sc.listenerBus,
> sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
> sc.env.blockManager.master,
> sc.env) {
> override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = 
> {
>   // By this, we can mimic a stage with all tasks finished
>   // but finalization is incomplete.
>   latch.countDown()
> }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 20
>   val shuffleMapRdd = new MyRDD(sc, parts, Nil)
>   val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(parts))
>   val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
> mapOutputTracker)
>   reduceRdd1.countAsync()
>   latch.await()
>   // set _shuffleMergedFinalized to true can avoid the hang.
>   // shuffleDep._shuffleMergedFinalized = true
>   val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
>   reduceRdd2.count()
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

2021-08-22 Thread wuyi (Jira)
wuyi created SPARK-36558:


 Summary: Stage has all tasks finished but with ongoing 
finalization can cause job hang
 Key: SPARK-36558
 URL: https://issues.apache.org/jira/browse/SPARK-36558
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 3.2.0, 3.3.0
Reporter: wuyi


 

For a stage that all tasks are finished but with ongoing finalization can lead 
to job hang. The problem is that such stage is considered as a "missing" stage 
(see 
[https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).]
 And it breaks the original assumption that a "missing" stage must have tasks 
to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have 
finished in stage A, stage A will be skipped directly when submitting stage B. 
However, with this bug, stage A will be submitted, which leads to the job hang 
in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
sc,
sc.dagScheduler.taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env) {
override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
  // By this, we can mimic a stage with all tasks finished
  // but finalization is incomplete.
  latch.countDown()
}
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36543) Decommission logs too frequent when waiting migration to finish

2021-08-18 Thread wuyi (Jira)
wuyi created SPARK-36543:


 Summary: Decommission logs too frequent when waiting migration to 
finish
 Key: SPARK-36543
 URL: https://issues.apache.org/jira/browse/SPARK-36543
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 3.1.0, 3.2.0, 3.3.0
Reporter: wuyi


{code:java}
21/08/18 08:14:31 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:31 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:31 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:32 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:32 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:32 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:33 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:33 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:33 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:34 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:34 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:34 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:35 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:35 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:35 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:36 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:36 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:36 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:37 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:37 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:37 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:38 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:38 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:38 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:39 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:39 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:39 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:40 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:40 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:40 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:41 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:41 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:41 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:42 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:42 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:42 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:43 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:43 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:43 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
21/08/18 08:14:44 INFO CoarseGrainedExecutorBackend: Checking to see if we can 
shutdown.
21/08/18 08:14:44 INFO CoarseGrainedExecutorBackend: No running tasks, checking 
migrations
21/08/18 08:14:44 INFO CoarseGrainedExecutorBackend: All blocks not yet 
migrated.
...{code}
It takes some time to migrate data (shuffle or rdd). Logging per second is too 
frequent. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36532) Deadlock in CoarseGrainedExecutorBackend.onDisconnected

2021-08-17 Thread wuyi (Jira)
wuyi created SPARK-36532:


 Summary: Deadlock in CoarseGrainedExecutorBackend.onDisconnected
 Key: SPARK-36532
 URL: https://issues.apache.org/jira/browse/SPARK-36532
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0
Reporter: wuyi


The deadlock has the exactly same root cause as SPARK-14180 but just happens in 
a different code path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle

2021-08-16 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi resolved SPARK-36530.
--
Resolution: Won't Fix

> Avoid finalizing when there's no push at all in a shuffle
> -
>
> Key: SPARK-36530
> URL: https://issues.apache.org/jira/browse/SPARK-36530
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Major
>
> When all partition data of a map output is bigger than 
> spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map 
> outputs don't have push, the shuffle doesn't have push. In that case, we 
> don't need to launch the finalizing request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle

2021-08-16 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400118#comment-17400118
 ] 

wuyi commented on SPARK-36530:
--

SGTM. Could you update SPARK-33701 to include this part? Then, I'll close this 
one.

> Avoid finalizing when there's no push at all in a shuffle
> -
>
> Key: SPARK-36530
> URL: https://issues.apache.org/jira/browse/SPARK-36530
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Major
>
> When all partition data of a map output is bigger than 
> spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map 
> outputs don't have push, the shuffle doesn't have push. In that case, we 
> don't need to launch the finalizing request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle

2021-08-16 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400089#comment-17400089
 ] 

wuyi commented on SPARK-36530:
--

cc [~mridulm80] [~mshen] any thoughts?

> Avoid finalizing when there's no push at all in a shuffle
> -
>
> Key: SPARK-36530
> URL: https://issues.apache.org/jira/browse/SPARK-36530
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: wuyi
>Priority: Major
>
> When all partition data of a map output is bigger than 
> spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map 
> outputs don't have push, the shuffle doesn't have push. In that case, we 
> don't need to launch the finalizing request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   4   5   >