[jira] [Updated] (SPARK-48394) Cleanup mapIdToMapIndex on mapoutput unregister
[ https://issues.apache.org/jira/browse/SPARK-48394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-48394: - Description: There is only one valid mapstatus for the same {{mapIndex}} at the same time in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid chaos. The issue leads to shuffle fetch failure and the job failure in the end. It happens this way: # Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on executor e1 # Executor Y starts deommission # Executor Y reports false-positve lost to driver during its decommission # Stage B reuse the shuffle dependency with Stage A, and computes the partition P0 again by task t2 on executor e2 # When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the same paritition in {{mapIdToMapIndex}} but only one item (mapStatuses(P0)=MapStatus(t2, e2)) in {{{}mapStatuses{}}}. # Executor Y starts to migrate task t1's mapstatus (to executor e3 for example) and call {{updateMapOutput}} on driver. Regarding to 5), we'd use mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's mapstatus. // updateMapOutputval mapIndex = mapIdToMapIndex.get(mapId)val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_)) # Task t2's mapstatus's location then would be updated to executor e3 but it's indeed still located on executor e2. This finally leads to the fetch failure in the end. was:There is only one valid mapstatus for the same {{mapIndex}} at the same time in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid chaos. > Cleanup mapIdToMapIndex on mapoutput unregister > --- > > Key: SPARK-48394 > URL: https://issues.apache.org/jira/browse/SPARK-48394 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0, 4.0.0, 3.5.1 >Reporter: wuyi >Assignee: wuyi >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > There is only one valid mapstatus for the same {{mapIndex}} at the same time > in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid > chaos. > > The issue leads to shuffle fetch failure and the job failure in the end. It > happens this way: > # Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on > executor e1 > # Executor Y starts deommission > # Executor Y reports false-positve lost to driver during its decommission > # Stage B reuse the shuffle dependency with Stage A, and computes the > partition P0 again by task t2 on executor e2 > # When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the > same paritition in {{mapIdToMapIndex}} but only one item > (mapStatuses(P0)=MapStatus(t2, e2)) in {{{}mapStatuses{}}}. > # Executor Y starts to migrate task t1's mapstatus (to executor e3 for > example) and call {{updateMapOutput}} on driver. Regarding to 5), we'd use > mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's > mapstatus. > // updateMapOutputval mapIndex = mapIdToMapIndex.get(mapId)val mapStatusOpt = > mapIndex.map(mapStatuses(_)).flatMap(Option(_)) > # Task t2's mapstatus's location then would be updated to executor e3 but > it's indeed still located on executor e2. This finally leads to the fetch > failure in the end. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48394) Cleanup mapIdToMapIndex on mapoutput unregister
[ https://issues.apache.org/jira/browse/SPARK-48394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-48394: - Issue Type: Bug (was: Improvement) > Cleanup mapIdToMapIndex on mapoutput unregister > --- > > Key: SPARK-48394 > URL: https://issues.apache.org/jira/browse/SPARK-48394 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0, 4.0.0, 3.5.1 >Reporter: wuyi >Assignee: wuyi >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > There is only one valid mapstatus for the same {{mapIndex}} at the same time > in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid > chaos. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48394) Cleanup mapIdToMapIndex on mapoutput unregister
wuyi created SPARK-48394: Summary: Cleanup mapIdToMapIndex on mapoutput unregister Key: SPARK-48394 URL: https://issues.apache.org/jira/browse/SPARK-48394 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.5.1, 3.5.0, 4.0.0 Reporter: wuyi There is only one valid mapstatus for the same {{mapIndex}} at the same time in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid chaos. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46052) Remove unnecessary TaskScheduler.killAllTaskAttempts
[ https://issues.apache.org/jira/browse/SPARK-46052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-46052: - Description: Spark has two functions to kill all tasks in a Stage: * `cancelTasks`: Not only kill all the running tasks in all the stage attempts but also abort all the stage attempts * `killAllTaskAttempts`: Only kill all the running tasks in all the stage attemtps but won't abort the attempts. However, there's no use case in Spark that a stage would launch new tasks after its all tasks get killed. So I think we can replace `killAllTaskAttempts` with `cancelTasks` directly. was:killAllTaskAttempts can be replaced with cancelTasks > Remove unnecessary TaskScheduler.killAllTaskAttempts > > > Key: SPARK-46052 > URL: https://issues.apache.org/jira/browse/SPARK-46052 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.3, 3.1.3, 3.2.4, 3.3.3, 3.4.1, 3.5.0 >Reporter: wuyi >Priority: Major > Labels: pull-request-available > > Spark has two functions to kill all tasks in a Stage: > * `cancelTasks`: Not only kill all the running tasks in all the stage > attempts but also abort all the stage attempts > * `killAllTaskAttempts`: Only kill all the running tasks in all the stage > attemtps but won't abort the attempts. > However, there's no use case in Spark that a stage would launch new tasks > after its all tasks get killed. So I think we can replace > `killAllTaskAttempts` with `cancelTasks` directly. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46052) Remove unnecessary TaskScheduler.killAllTaskAttempts
wuyi created SPARK-46052: Summary: Remove unnecessary TaskScheduler.killAllTaskAttempts Key: SPARK-46052 URL: https://issues.apache.org/jira/browse/SPARK-46052 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.5.0, 3.4.1, 3.3.3, 3.2.4, 3.1.3, 3.0.3 Reporter: wuyi killAllTaskAttempts can be replaced with cancelTasks -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45527) Task fraction resource request is not expected
[ https://issues.apache.org/jira/browse/SPARK-45527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774756#comment-17774756 ] wuyi commented on SPARK-45527: -- cc [~wbo4958] [~tgraves] > Task fraction resource request is not expected > -- > > Key: SPARK-45527 > URL: https://issues.apache.org/jira/browse/SPARK-45527 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.1, 3.3.3, 3.4.1, 3.5.0 >Reporter: wuyi >Priority: Major > > > {code:java} > test("SPARK-XXX") { > import org.apache.spark.resource.{ResourceProfileBuilder, > TaskResourceRequests} > withTempDir { dir => > val scriptPath = createTempScriptWithExpectedOutput(dir, > "gpuDiscoveryScript", > """{"name": "gpu","addresses":["0"]}""") > val conf = new SparkConf() > .setAppName("test") > .setMaster("local-cluster[1, 12, 1024]") > .set("spark.executor.cores", "12") > conf.set(TASK_GPU_ID.amountConf, "0.08") > conf.set(WORKER_GPU_ID.amountConf, "1") > conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath) > conf.set(EXECUTOR_GPU_ID.amountConf, "1") > sc = new SparkContext(conf) > val rdd = sc.range(0, 100, 1, 4) > var rdd1 = rdd.repartition(3) > val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0) > val rp = new ResourceProfileBuilder().require(treqs).build > rdd1 = rdd1.withResources(rp) > assert(rdd1.collect().size === 100) > } > } {code} > In the above test, the 3 tasks generated by rdd1 are expected to be executed > in sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", > 1.0)" should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, > those 3 tasks are run in parallel in fact. > The root cause is that ExecutorData#ExecutorResourceInfo#numParts is static. > In this case, the "gpu.numParts" is initialized with 12 (1/0.08) and won't > change even if there's a new task resource request (e.g., resource("gpu", > 1.0) in this case). Thus, those 3 tasks are able to be executed in parallel. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45527) Task fraction resource request is not expected
wuyi created SPARK-45527: Summary: Task fraction resource request is not expected Key: SPARK-45527 URL: https://issues.apache.org/jira/browse/SPARK-45527 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.5.0, 3.4.1, 3.3.3, 3.2.1 Reporter: wuyi {code:java} test("SPARK-XXX") { import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} withTempDir { dir => val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""") val conf = new SparkConf() .setAppName("test") .setMaster("local-cluster[1, 12, 1024]") .set("spark.executor.cores", "12") conf.set(TASK_GPU_ID.amountConf, "0.08") conf.set(WORKER_GPU_ID.amountConf, "1") conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath) conf.set(EXECUTOR_GPU_ID.amountConf, "1") sc = new SparkContext(conf) val rdd = sc.range(0, 100, 1, 4) var rdd1 = rdd.repartition(3) val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0) val rp = new ResourceProfileBuilder().require(treqs).build rdd1 = rdd1.withResources(rp) assert(rdd1.collect().size === 100) } } {code} In the above test, the 3 tasks generated by rdd1 are expected to be executed in sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", 1.0)" should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, those 3 tasks are run in parallel in fact. The root cause is that ExecutorData#ExecutorResourceInfo#numParts is static. In this case, the "gpu.numParts" is initialized with 12 (1/0.08) and won't change even if there's a new task resource request (e.g., resource("gpu", 1.0) in this case). Thus, those 3 tasks are able to be executed in parallel. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45527) Task fraction resource request is not expected
[ https://issues.apache.org/jira/browse/SPARK-45527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-45527: - Description: {code:java} test("SPARK-XXX") { import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} withTempDir { dir => val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""") val conf = new SparkConf() .setAppName("test") .setMaster("local-cluster[1, 12, 1024]") .set("spark.executor.cores", "12") conf.set(TASK_GPU_ID.amountConf, "0.08") conf.set(WORKER_GPU_ID.amountConf, "1") conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath) conf.set(EXECUTOR_GPU_ID.amountConf, "1") sc = new SparkContext(conf) val rdd = sc.range(0, 100, 1, 4) var rdd1 = rdd.repartition(3) val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0) val rp = new ResourceProfileBuilder().require(treqs).build rdd1 = rdd1.withResources(rp) assert(rdd1.collect().size === 100) } } {code} In the above test, the 3 tasks generated by rdd1 are expected to be executed in sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", 1.0)" should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, those 3 tasks are run in parallel in fact. The root cause is that ExecutorData#ExecutorResourceInfo#numParts is static. In this case, the "gpu.numParts" is initialized with 12 (1/0.08) and won't change even if there's a new task resource request (e.g., resource("gpu", 1.0) in this case). Thus, those 3 tasks are able to be executed in parallel. was: {code:java} test("SPARK-XXX") { import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} withTempDir { dir => val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""") val conf = new SparkConf() .setAppName("test") .setMaster("local-cluster[1, 12, 1024]") .set("spark.executor.cores", "12") conf.set(TASK_GPU_ID.amountConf, "0.08") conf.set(WORKER_GPU_ID.amountConf, "1") conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath) conf.set(EXECUTOR_GPU_ID.amountConf, "1") sc = new SparkContext(conf) val rdd = sc.range(0, 100, 1, 4) var rdd1 = rdd.repartition(3) val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0) val rp = new ResourceProfileBuilder().require(treqs).build rdd1 = rdd1.withResources(rp) assert(rdd1.collect().size === 100) } } {code} In the above test, the 3 tasks generated by rdd1 are expected to be executed in sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", 1.0)" should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, those 3 tasks are run in parallel in fact. The root cause is that ExecutorData#ExecutorResourceInfo#numParts is static. In this case, the "gpu.numParts" is initialized with 12 (1/0.08) and won't change even if there's a new task resource request (e.g., resource("gpu", 1.0) in this case). Thus, those 3 tasks are able to be executed in parallel. > Task fraction resource request is not expected > -- > > Key: SPARK-45527 > URL: https://issues.apache.org/jira/browse/SPARK-45527 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.1, 3.3.3, 3.4.1, 3.5.0 >Reporter: wuyi >Priority: Major > > > {code:java} > test("SPARK-XXX") { > import org.apache.spark.resource.{ResourceProfileBuilder, > TaskResourceRequests} > withTempDir { dir => > val scriptPath = createTempScriptWithExpectedOutput(dir, > "gpuDiscoveryScript", > """{"name": "gpu","addresses":["0"]}""") > val conf = new SparkConf() > .setAppName("test") > .setMaster("local-cluster[1, 12, 1024]") > .set("spark.executor.cores", "12") > conf.set(TASK_GPU_ID.amountConf, "0.08") > conf.set(WORKER_GPU_ID.amountConf, "1") > conf.set(WORKER_GPU_ID.discoveryScriptConf, scriptPath) > conf.set(EXECUTOR_GPU_ID.amountConf, "1") > sc = new SparkContext(conf) > val rdd = sc.range(0, 100, 1, 4) > var rdd1 = rdd.repartition(3) > val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0) > val rp = new ResourceProfileBuilder().require(treqs).build > rdd1 = rdd1.withResources(rp) > assert(rdd1.collect().size === 100) > } > } {code} > In the above test, the 3 tasks generated by rdd1 are expected to be executed > in sequence as we expect "new TaskResourceRequests().cpus(1).resource("gpu", > 1.0)" should override "conf.set(TASK_GPU_ID.amountConf, "0.08")". However, > those 3 tasks are run in parallel in fact. > The root cause is that
[jira] [Commented] (SPARK-45057) Deadlock caused by rdd replication level of 2
[ https://issues.apache.org/jira/browse/SPARK-45057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769443#comment-17769443 ] wuyi commented on SPARK-45057: -- In the case of "Received UploadBlock request from T1 (blocked by T4)", shouldn't it be blocked by T3? > Deadlock caused by rdd replication level of 2 > - > > Key: SPARK-45057 > URL: https://issues.apache.org/jira/browse/SPARK-45057 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.1 >Reporter: Zhongwei Zhu >Priority: Major > Labels: pull-request-available > > > When 2 tasks try to compute same rdd with replication level of 2 and running > on only 2 executors. Deadlock will happen. > Task only release lock after writing into local machine and replicate to > remote executor. > > ||Time||Exe 1 (Task Thread T1)||Exe 1 (Shuffle Server Thread T2)||Exe 2 (Task > Thread T3)||Exe 2 (Shuffle Server Thread T4)|| > |T0|write lock of rdd| | | | > |T1| | |write lock of rdd| | > |T2|replicate -> UploadBlockSync (blocked by T4)| | | | > |T3| | | |Received UploadBlock request from T1 (blocked by T4)| > |T4| | |replicate -> UploadBlockSync (blocked by T2)| | > |T5| |Received UploadBlock request from T3 (blocked by T1)| | | > |T6|Deadlock|Deadlock|Deadlock|Deadlock| -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45310) Mapstatus location type changed from external shuffle service to executor after decommission migration
wuyi created SPARK-45310: Summary: Mapstatus location type changed from external shuffle service to executor after decommission migration Key: SPARK-45310 URL: https://issues.apache.org/jira/browse/SPARK-45310 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.5.0, 3.4.1, 3.3.2, 3.2.4, 3.1.3, 3.0.3 Reporter: wuyi When migrating shuffle blocks during decommission, the updated mapstatus location doesn't respect the external shuffle service location when external shuffle service is enabled. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42577) A large stage could run indefinitely due to executor lost
wuyi created SPARK-42577: Summary: A large stage could run indefinitely due to executor lost Key: SPARK-42577 URL: https://issues.apache.org/jira/browse/SPARK-42577 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.3.2, 3.2.3, 3.1.3, 3.0.3 Reporter: wuyi When a stage is extremely large and Spark runs on spot instances or problematic clusters with frequent worker/executor loss, the stage could run indefinitely due to task rerun caused by the executor loss. This happens, when the external shuffle service is on, and the large stages runs hours to complete, when spark tries to submit a child stage, it will find the parent stage - the large one, has missed some partitions, so the large stage has to rerun. When it completes again, it finds new missing partitions due to the same reason. We should add a attempt limitation for this kind of scenario. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41958) Disallow arbitrary custom classpath with proxy user in cluster mode
wuyi created SPARK-41958: Summary: Disallow arbitrary custom classpath with proxy user in cluster mode Key: SPARK-41958 URL: https://issues.apache.org/jira/browse/SPARK-41958 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.2.3, 3.3.1, 3.1.3, 3.0.3, 2.4.8 Reporter: wuyi To avoid arbitrary classpath in spark cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653781#comment-17653781 ] wuyi commented on SPARK-41497: -- > If I am not wrong, SQL makes very heavy use of accumulators, and so most > stages will end up having them anyway - right ? Right. > I would expect this scenario (even without accumulator) to be fairly low > frequency enough that the cost of extra recomputation might be fine. Agree. So shall we proceed with the improved Option 4 that was proposed by you [~mridulm80] ? [~ivoson] can help with the fix. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Updated] (SPARK-41848) Tasks are over-scheduled with TaskResourceProfile
[ https://issues.apache.org/jira/browse/SPARK-41848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-41848: - Priority: Blocker (was: Major) > Tasks are over-scheduled with TaskResourceProfile > - > > Key: SPARK-41848 > URL: https://issues.apache.org/jira/browse/SPARK-41848 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: wuyi >Priority: Blocker > > {code:java} > test("SPARK-XXX") { > val conf = new > SparkConf().setAppName("test").setMaster("local-cluster[1,4,1024]") > sc = new SparkContext(conf) > val req = new TaskResourceRequests().cpus(3) > val rp = new ResourceProfileBuilder().require(req).build() > val res = sc.parallelize(Seq(0, 1), 2).withResources(rp).map { x => > Thread.sleep(5000) > x * 2 > }.collect() > assert(res === Array(0, 2)) > } {code} > In this test, tasks are supposed to be scheduled in order since each task > requires 3 cores but the executor only has 4 cores. However, we noticed 2 > tasks are launched concurrently from the logs. > It turns out that we used the TaskResourceProfile (taskCpus=3) of the taskset > for task scheduling: > {code:java} > val rpId = taskSet.taskSet.resourceProfileId > val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId) > val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, > conf) {code} > but the ResourceProfile (taskCpus=1) of the executor for updating the free > cores in ExecutorData: > {code:java} > val rpId = executorData.resourceProfileId > val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) > val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) > executorData.freeCores -= taskCpus {code} > which results in the inconsistency of the available cores. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41848) Tasks are over-scheduled with TaskResourceProfile
[ https://issues.apache.org/jira/browse/SPARK-41848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653739#comment-17653739 ] wuyi commented on SPARK-41848: -- cc [~ivoson] > Tasks are over-scheduled with TaskResourceProfile > - > > Key: SPARK-41848 > URL: https://issues.apache.org/jira/browse/SPARK-41848 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: wuyi >Priority: Major > > {code:java} > test("SPARK-XXX") { > val conf = new > SparkConf().setAppName("test").setMaster("local-cluster[1,4,1024]") > sc = new SparkContext(conf) > val req = new TaskResourceRequests().cpus(3) > val rp = new ResourceProfileBuilder().require(req).build() > val res = sc.parallelize(Seq(0, 1), 2).withResources(rp).map { x => > Thread.sleep(5000) > x * 2 > }.collect() > assert(res === Array(0, 2)) > } {code} > In this test, tasks are supposed to be scheduled in order since each task > requires 3 cores but the executor only has 4 cores. However, we noticed 2 > tasks are launched concurrently from the logs. > It turns out that we used the TaskResourceProfile (taskCpus=3) of the taskset > for task scheduling: > {code:java} > val rpId = taskSet.taskSet.resourceProfileId > val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId) > val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, > conf) {code} > but the ResourceProfile (taskCpus=1) of the executor for updating the free > cores in ExecutorData: > {code:java} > val rpId = executorData.resourceProfileId > val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) > val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) > executorData.freeCores -= taskCpus {code} > which results in the inconsistency of the available cores. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41848) Tasks are over-scheduled with TaskResourceProfile
wuyi created SPARK-41848: Summary: Tasks are over-scheduled with TaskResourceProfile Key: SPARK-41848 URL: https://issues.apache.org/jira/browse/SPARK-41848 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.4.0 Reporter: wuyi {code:java} test("SPARK-XXX") { val conf = new SparkConf().setAppName("test").setMaster("local-cluster[1,4,1024]") sc = new SparkContext(conf) val req = new TaskResourceRequests().cpus(3) val rp = new ResourceProfileBuilder().require(req).build() val res = sc.parallelize(Seq(0, 1), 2).withResources(rp).map { x => Thread.sleep(5000) x * 2 }.collect() assert(res === Array(0, 2)) } {code} In this test, tasks are supposed to be scheduled in order since each task requires 3 cores but the executor only has 4 cores. However, we noticed 2 tasks are launched concurrently from the logs. It turns out that we used the TaskResourceProfile (taskCpus=3) of the taskset for task scheduling: {code:java} val rpId = taskSet.taskSet.resourceProfileId val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, conf) {code} but the ResourceProfile (taskCpus=1) of the executor for updating the free cores in ExecutorData: {code:java} val rpId = executorData.resourceProfileId val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) executorData.freeCores -= taskCpus {code} which results in the inconsistency of the available cores. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39853) Support stage level schedule for standalone cluster when dynamic allocation is disabled
[ https://issues.apache.org/jira/browse/SPARK-39853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-39853: - Fix Version/s: 3.4.0 > Support stage level schedule for standalone cluster when dynamic allocation > is disabled > --- > > Key: SPARK-39853 > URL: https://issues.apache.org/jira/browse/SPARK-39853 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: huangtengfei >Assignee: huangtengfei >Priority: Major > Fix For: 3.4.0 > > > [SPARK-39062|https://issues.apache.org/jira/browse/SPARK-39062] added stage > level schedule support for standalone cluster when dynamic allocation was > enabled, spark would request for executors for different resource profiles. > While when dynamic allocation is disabled, we can also leverage stage level > schedule to schedule tasks based on resource profile(task resource requests) > to executors with default resource profile. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646971#comment-17646971 ] wuyi edited comment on SPARK-41497 at 12/14/22 7:31 AM: I'm thinking if we could improve the improved Option 4 by changing the rdd cache reuse condition a bit: if there are no accumulators (external only probably) values changed after the rdd computation, then the rdd's cache should be marked as usable/visible no matter whether the task succeeds or fail; If there are accumulators values changed after the rdd computation, then the rdd's cache should only be marked as usable/visible only when the task succeeds. (let me think further and see if it's doable..) was (Author: ngone51): I'm thinking if we could improve the improved Option 4 by changing the rdd cache reuse condition a bit: if there're no accumulators (external only probably) values changed after the rdd computation, then the rdd's cache should be marked as usable/visible no matter whether the task succeeds or fail. (let me think further and see if it's doable..) > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache:
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646971#comment-17646971 ] wuyi commented on SPARK-41497: -- I'm thinking if we could improve the improved Option 4 by changing the rdd cache reuse condition a bit: if there're no accumulators (external only probably) values changed after the rdd computation, then the rdd's cache should be marked as usable/visible no matter whether the task succeeds or fail. (let me think further and see if it's doable..) > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646969#comment-17646969 ] wuyi commented on SPARK-41497: -- > do we have a way to do that ? [~mridulm80] Currently, we only have the mapping between the task and accumulators. Accumulators are registered to the task via TaskContext.get() when they deserialize at the executor. If we could have a way to know which RDD scope the accumulator within when deserializing, we could set up the mapping between the RDD and accumulators then. This probably be the most difficult part. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646919#comment-17646919 ] wuyi commented on SPARK-41497: -- [~mridulm80] For b) and c), shouldn't we allow T2 to use the result of T1's cache if rdd1's computation doesn't include any accumulators? > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646639#comment-17646639 ] wuyi commented on SPARK-41497: -- [~mridulm80] Sounds like a better idea than option 4. But I think it still doesn't work well for the case like: For example, a task is constructed by `rdd1.cache().rdd2`. So if the task fails due to rdd2's computation, I think rdd1's cache should still be able to reuse. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-41497: - Description: Accumulator could be undercounted when the retried task has rdd cache. See the example below and you could also find the completed and reproducible example at [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] {code:scala} test("SPARK-XXX") { // Set up a cluster with 2 executors val conf = new SparkConf() .setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite") sc = new SparkContext(conf) // Set up a custom task scheduler. The scheduler will fail the first task attempt of the job // submitted below. In particular, the failed first attempt task would success on computation // (accumulator accounting, result caching) but only fail to report its success status due // to the concurrent executor lost. The second task attempt would success. taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) val myAcc = sc.longAccumulator("myAcc") // Initiate a rdd with only one partition so there's only one task and specify the storage level // with MEMORY_ONLY_2 so that the rdd result will be cached on both two executors. val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => myAcc.add(100) iter.map(x => x + 1) }.persist(StorageLevel.MEMORY_ONLY_2) // This will pass since the second task attempt will succeed assert(rdd.count() === 10) // This will fail due to `myAcc.add(100)` won't be executed during the second task attempt's // execution. Because the second task attempt will load the rdd cache directly instead of // executing the task function so `myAcc.add(100)` is skipped. assert(myAcc.value === 100) } {code} We could also hit this issue with decommission even if the rdd only has one copy. For example, decommission could migrate the rdd cache block to another executor (the result is actually the same with 2 copies) and the decommissioned executor lost before the task reports its success status to the driver. And the issue is a bit more complicated than expected to fix. I have tried to give some fixes but all of them are not ideal: Option 1: Clean up any rdd cache related to the failed task: in practice, this option can already fix the issue in most cases. However, theoretically, rdd cache could be reported to the driver right after the driver cleans up the failed task's caches due to asynchronous communication. So this option can’t resolve the issue thoroughly; Option 2: Disallow rdd cache reuse across the task attempts for the same task: this option can 100% fix the issue. The problem is this way can also affect the case where rdd cache can be reused across the attempts (e.g., when there is no accumulator operation in the task), which can have perf regression; Option 3: Introduce accumulator cache: first, this requires a new framework for supporting accumulator cache; second, the driver should improve its logic to distinguish whether the accumulator cache value should be reported to the user to avoid overcounting. For example, in the case of task retry, the value should be reported. However, in the case of rdd cache reuse, the value shouldn’t be reported (should it?); Option 4: Do task success validation when a task trying to load the rdd cache: this way defines a rdd cache is only valid/accessible if the task has succeeded. This way could be either overkill or a bit complex (because currently Spark would clean up the task state once it’s finished. So we need to maintain a structure to know if task once succeeded or not. ) was: Accumulator could be undercounted when the retried task has rdd cache. See the example below and you could also find the completed and reproducible example at [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] {code:scala} test("SPARK-XXX") { // Set up a cluster with 2 executors val conf = new SparkConf() .setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite") sc = new SparkContext(conf) // Set up a custom task scheduler. The scheduler will fail the first task attempt of the job // submitted below. In particular, the failed first attempt task would success on computation // (accumulator accounting, result caching) but only fail to report its success status due // to the concurrent executor lost. The second task attempt would success. taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) val myAcc = sc.longAccumulator("myAcc") // Initiate a rdd with only one partition so there's only one task and specify the storage level // with MEMORY_ONLY_2 so that the rdd result will be cached on both two executors. val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => myAcc.add(100) iter.map(x => x + 1) }.persist(StorageLevel.MEMORY_ONLY_2) // This will
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646097#comment-17646097 ] wuyi commented on SPARK-41497: -- [~mridulm80] [~tgraves] [~attilapiros] [~ivoson] any good ideas? > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task was successful or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-41497: - Description: Accumulator could be undercounted when the retried task has rdd cache. See the example below and you could also find the completed and reproducible example at [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] {code:scala} test("SPARK-XXX") { // Set up a cluster with 2 executors val conf = new SparkConf() .setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite") sc = new SparkContext(conf) // Set up a custom task scheduler. The scheduler will fail the first task attempt of the job // submitted below. In particular, the failed first attempt task would success on computation // (accumulator accounting, result caching) but only fail to report its success status due // to the concurrent executor lost. The second task attempt would success. taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) val myAcc = sc.longAccumulator("myAcc") // Initiate a rdd with only one partition so there's only one task and specify the storage level // with MEMORY_ONLY_2 so that the rdd result will be cached on both two executors. val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => myAcc.add(100) iter.map(x => x + 1) }.persist(StorageLevel.MEMORY_ONLY_2) // This will pass since the second task attempt will succeed assert(rdd.count() === 10) // This will fail due to `myAcc.add(100)` won't be executed during the second task attempt's // execution. Because the second task attempt will load the rdd cache directly instead of // executing the task function so `myAcc.add(100)` is skipped. assert(myAcc.value === 100) } {code} We could also hit this issue with decommission even if the rdd only has one copy. For example, decommission could migrate the rdd cache block to another executor (the result is actually the same with 2 copies) and the decommissioned executor lost before the task reports its success status to the driver. And the issue is a bit more complicated than expected to fix. I have tried to give some fixes but all of them are not ideal: Option 1: Clean up any rdd cache related to the failed task: in practice, this option can already fix the issue in most cases. However, theoretically, rdd cache could be reported to the driver right after the driver cleans up the failed task's caches due to asynchronous communication. So this option can’t resolve the issue thoroughly; Option 2: Disallow rdd cache reuse across the task attempts for the same task: this option can 100% fix the issue. The problem is this way can also affect the case where rdd cache can be reused across the attempts (e.g., when there is no accumulator operation in the task), which can have perf regression; Option 3: Introduce accumulator cache: first, this requires a new framework for supporting accumulator cache; second, the driver should improve its logic to distinguish whether the accumulator cache value should be reported to the user to avoid overcounting. For example, in the case of task retry, the value should be reported. However, in the case of rdd cache reuse, the value shouldn’t be reported (should it?); Option 4: Do task success validation when a task trying to load the rdd cache: this way defines a rdd cache is only valid/accessible if the task has succeeded. This way could be either overkill or a bit complex (because currently Spark would clean up the task state once it’s finished. So we need to maintain a structure to know if task was successful or not. ) was:Accumulator could be undercounted when the retried task has rdd cached. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to
[jira] [Created] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
wuyi created SPARK-41497: Summary: Accumulator undercounting in the case of retry task with rdd cache Key: SPARK-41497 URL: https://issues.apache.org/jira/browse/SPARK-41497 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.3.1, 3.2.2, 3.1.3, 3.0.3, 2.4.8 Reporter: wuyi Accumulator could be undercounted when the retried task has rdd cached. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41469) Task rerun on decommissioned executor can be avoided if shuffle data has migrated
wuyi created SPARK-41469: Summary: Task rerun on decommissioned executor can be avoided if shuffle data has migrated Key: SPARK-41469 URL: https://issues.apache.org/jira/browse/SPARK-41469 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.3.1, 3.2.2, 3.1.3 Reporter: wuyi Currently, we will always rerun a finished shuffle map task if it once runs the lost executor. However, in the case of the executor loss is caused by decommission, the shuffle data might be migrated so that task doesn't need to rerun. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41460) Introduce IsolatedThreadSafeRpcEndpoint to extend IsolatedRpcEndpoint
wuyi created SPARK-41460: Summary: Introduce IsolatedThreadSafeRpcEndpoint to extend IsolatedRpcEndpoint Key: SPARK-41460 URL: https://issues.apache.org/jira/browse/SPARK-41460 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.3.1, 3.2.2, 3.1.3, 3.0.3, 2.4.8 Reporter: wuyi The current RpcEndpoint extension to IsolatedRpcEndpoint always implies that it's thread-safe. However, it's not explicit to developers. So people may mistakenly override the IsolatedRpcEndpoint#threadCount with more than 1 threadCount leading to the thread-safety broken. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41360) Avoid BlockManager re-registration if the executor has been lost
[ https://issues.apache.org/jira/browse/SPARK-41360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-41360: - Summary: Avoid BlockManager re-registration if the executor has been lost (was: Avoid BlockMananger re-registration if the executor has been lost) > Avoid BlockManager re-registration if the executor has been lost > > > Key: SPARK-41360 > URL: https://issues.apache.org/jira/browse/SPARK-41360 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > We should avoid block manager re-registration if the executor has been lost > as it's meaningless and harmful, e.g., SPARK-35011 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41360) Avoid BlockMananger re-registration if the executor has been lost
wuyi created SPARK-41360: Summary: Avoid BlockMananger re-registration if the executor has been lost Key: SPARK-41360 URL: https://issues.apache.org/jira/browse/SPARK-41360 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.3.1, 3.2.2, 3.1.3, 3.0.3, 2.4.8 Reporter: wuyi We should avoid block manager re-registration if the executor has been lost as it's meaningless and harmful, e.g., SPARK-35011 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35011) False active executor in UI that caused by BlockManager reregistration
[ https://issues.apache.org/jira/browse/SPARK-35011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-35011: - Summary: False active executor in UI that caused by BlockManager reregistration (was: Avoid Block Manager registerations when StopExecutor msg is in-flight.) > False active executor in UI that caused by BlockManager reregistration > -- > > Key: SPARK-35011 > URL: https://issues.apache.org/jira/browse/SPARK-35011 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.1, 3.2.0 >Reporter: Sumeet >Assignee: wuyi >Priority: Major > Labels: BlockManager, core > Fix For: 3.3.0 > > > *Note:* This is a follow-up on SPARK-34949, even after the heartbeat fix, > driver reports dead executors as alive. > *Problem:* > I was testing Dynamic Allocation on K8s with about 300 executors. While doing > so, when the executors were torn down due to > "spark.dynamicAllocation.executorIdleTimeout", I noticed all the executor > pods being removed from K8s, however, under the "Executors" tab in SparkUI, I > could see some executors listed as alive. > [spark.sparkContext.statusTracker.getExecutorInfos.length|https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala#L100] > also returned a value greater than 1. > > *Cause:* > * "CoarseGrainedSchedulerBackend" issues async "StopExecutor" on > executorEndpoint > * "CoarseGrainedSchedulerBackend" removes that executor from Driver's > internal data structures and publishes "SparkListenerExecutorRemoved" on the > "listenerBus". > * Executor has still not processed "StopExecutor" from the Driver > * Driver receives heartbeat from the Executor, since it cannot find the > "executorId" in its data structures, it responds with > "HeartbeatResponse(reregisterBlockManager = true)" > * "BlockManager" on the Executor reregisters with the "BlockManagerMaster" > and "SparkListenerBlockManagerAdded" is published on the "listenerBus" > * Executor starts processing the "StopExecutor" and exits > * "AppStatusListener" picks the "SparkListenerBlockManagerAdded" event and > updates "AppStatusStore" > * "statusTracker.getExecutorInfos" refers "AppStatusStore" to get the list > of executors which returns the dead executor as alive. > > *Proposed Solution:* > Maintain a Cache of recently removed executors on Driver. During the > registration in BlockManagerMasterEndpoint if the BlockManager belongs to a > recently removed executor, return None indicating the registration is ignored > since the executor will be shutting down soon. > On BlockManagerHeartbeat, if the BlockManager belongs to a recently removed > executor, return true indicating the driver knows about it, thereby > preventing reregisteration. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-40596) Populate ExecutorDecommission with more informative messages
[ https://issues.apache.org/jira/browse/SPARK-40596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-40596. -- Assignee: Bo Zhang Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/38030 > Populate ExecutorDecommission with more informative messages > > > Key: SPARK-40596 > URL: https://issues.apache.org/jira/browse/SPARK-40596 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Bo Zhang >Assignee: Bo Zhang >Priority: Major > > Currently the message in {{ExecutorDecommission}} is a fixed value > {{{}"Executor decommission."{}}}, and it is the same for all cases, including > spot instance interruptions and auto-scaling down. We should put a detailed > message in {{ExecutorDecommission}} to better differentiate those cases. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-39853) Support stage level schedule for standalone cluster when dynamic allocation is disabled
[ https://issues.apache.org/jira/browse/SPARK-39853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-39853. -- Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/37268 > Support stage level schedule for standalone cluster when dynamic allocation > is disabled > --- > > Key: SPARK-39853 > URL: https://issues.apache.org/jira/browse/SPARK-39853 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: huangtengfei >Assignee: huangtengfei >Priority: Major > > [SPARK-39062|https://issues.apache.org/jira/browse/SPARK-39062] added stage > level schedule support for standalone cluster when dynamic allocation was > enabled, spark would request for executors for different resource profiles. > While when dynamic allocation is disabled, we can also leverage stage level > schedule to schedule tasks based on resource profile(task resource requests) > to executors with default resource profile. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-39853) Support stage level schedule for standalone cluster when dynamic allocation is disabled
[ https://issues.apache.org/jira/browse/SPARK-39853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi reassigned SPARK-39853: Assignee: huangtengfei > Support stage level schedule for standalone cluster when dynamic allocation > is disabled > --- > > Key: SPARK-39853 > URL: https://issues.apache.org/jira/browse/SPARK-39853 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: huangtengfei >Assignee: huangtengfei >Priority: Major > > [SPARK-39062|https://issues.apache.org/jira/browse/SPARK-39062] added stage > level schedule support for standalone cluster when dynamic allocation was > enabled, spark would request for executors for different resource profiles. > While when dynamic allocation is disabled, we can also leverage stage level > schedule to schedule tasks based on resource profile(task resource requests) > to executors with default resource profile. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40320) When the Executor plugin fails to initialize, the Executor shows active but does not accept tasks forever, just like being hung
[ https://issues.apache.org/jira/browse/SPARK-40320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17608268#comment-17608268 ] wuyi commented on SPARK-40320: -- I see. Thanks for the explaination. > When the Executor plugin fails to initialize, the Executor shows active but > does not accept tasks forever, just like being hung > --- > > Key: SPARK-40320 > URL: https://issues.apache.org/jira/browse/SPARK-40320 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 3.0.0 >Reporter: Mars >Priority: Major > > *Reproduce step:* > set `spark.plugins=ErrorSparkPlugin` > `ErrorSparkPlugin` && `ErrorExecutorPlugin` class as below (I abbreviate the > code to make it clearer): > {code:java} > class ErrorSparkPlugin extends SparkPlugin { > /** >*/ > override def driverPlugin(): DriverPlugin = new ErrorDriverPlugin() > /** >*/ > override def executorPlugin(): ExecutorPlugin = new ErrorExecutorPlugin() > }{code} > {code:java} > class ErrorExecutorPlugin extends ExecutorPlugin { > private val checkingInterval: Long = 1 > override def init(_ctx: PluginContext, extraConf: util.Map[String, > String]): Unit = { > if (checkingInterval == 1) { > throw new UnsatisfiedLinkError("My Exception error") > } > } > } {code} > The Executor is active when we check in spark-ui, however it was broken and > doesn't receive any task. > *Root Cause:* > I check the code and I find in `org.apache.spark.rpc.netty.Inbox#safelyCall` > it will throw fatal error (`UnsatisfiedLinkError` is fatal erro ) in method > `dealWithFatalError` . Actually the `CoarseGrainedExecutorBackend` JVM > process is active but the communication thread is no longer working ( > please see `MessageLoop#receiveLoopRunnable` , `receiveLoop()` was broken, > so executor doesn't receive any message) > Some ideas: > I think it is very hard to know what happened here unless we check in the > code. The Executor is active but it can't do anything. We will wonder if the > driver is broken or the Executor problem. I think at least the Executor > status shouldn't be active here or the Executor can exitExecutor (kill itself) > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40320) When the Executor plugin fails to initialize, the Executor shows active but does not accept tasks forever, just like being hung
[ https://issues.apache.org/jira/browse/SPARK-40320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600777#comment-17600777 ] wuyi commented on SPARK-40320: -- > Actually the `CoarseGrainedExecutorBackend` JVM process is active but the > communication thread is no longer working ( please see > `MessageLoop#receiveLoopRunnable` , `receiveLoop()` was broken, so executor > doesn't receive any message) Hmm, shouldn't it bring up a new `receiveLoop()` to serve RPC messages according to [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L82-L89] . Why it doesn't? Besides, why SparkUncaughtExceptionHandler doesn't catch the fatal error? cc [~tgraves] [~mridulm80] > When the Executor plugin fails to initialize, the Executor shows active but > does not accept tasks forever, just like being hung > --- > > Key: SPARK-40320 > URL: https://issues.apache.org/jira/browse/SPARK-40320 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 3.0.0 >Reporter: Mars >Priority: Major > > *Reproduce step:* > set `spark.plugins=ErrorSparkPlugin` > `ErrorSparkPlugin` && `ErrorExecutorPlugin` class as below (I abbreviate the > code to make it clearer): > {code:java} > class ErrorSparkPlugin extends SparkPlugin { > /** >*/ > override def driverPlugin(): DriverPlugin = new ErrorDriverPlugin() > /** >*/ > override def executorPlugin(): ExecutorPlugin = new ErrorExecutorPlugin() > }{code} > {code:java} > class ErrorExecutorPlugin extends ExecutorPlugin { > private val checkingInterval: Long = 1 > override def init(_ctx: PluginContext, extraConf: util.Map[String, > String]): Unit = { > if (checkingInterval == 1) { > throw new UnsatisfiedLinkError("My Exception error") > } > } > } {code} > The Executor is active when we check in spark-ui, however it was broken and > doesn't receive any task. > *Root Cause:* > I check the code and I find in `org.apache.spark.rpc.netty.Inbox#safelyCall` > it will throw fatal error (`UnsatisfiedLinkError` is fatal erro ) in method > `dealWithFatalError` . Actually the `CoarseGrainedExecutorBackend` JVM > process is active but the communication thread is no longer working ( > please see `MessageLoop#receiveLoopRunnable` , `receiveLoop()` was broken, > so executor doesn't receive any message) > Some ideas: > I think it is very hard to know what happened here unless we check in the > code. The Executor is active but it can't do anything. We will wonder if the > driver is broken or the Executor problem. I think at least the Executor > status shouldn't be active here or the Executor can exitExecutor (kill itself) > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-39957) Delay onDisconnected to enable Driver receives ExecutorExitCode
[ https://issues.apache.org/jira/browse/SPARK-39957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi reassigned SPARK-39957: Assignee: Kai-Hsun Chen > Delay onDisconnected to enable Driver receives ExecutorExitCode > --- > > Key: SPARK-39957 > URL: https://issues.apache.org/jira/browse/SPARK-39957 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Kai-Hsun Chen >Assignee: Kai-Hsun Chen >Priority: Major > > There are two methods to detect executor loss. First, when RPC fails, the > function {{onDisconnected}} will be triggered. Second, when executor exits > with ExecutorExitCode, the exit code will be passed from ExecutorRunner to > Driver. These two methods may categorize same cases into different > conclusions. We hope to categorize the ExecutorLossReason by > ExecutorExitCode. This PR aims to make sure Driver receives ExecutorExitCode > before onDisconnected is called. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-39957) Delay onDisconnected to enable Driver receives ExecutorExitCode
[ https://issues.apache.org/jira/browse/SPARK-39957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-39957. -- Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/37400 > Delay onDisconnected to enable Driver receives ExecutorExitCode > --- > > Key: SPARK-39957 > URL: https://issues.apache.org/jira/browse/SPARK-39957 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Kai-Hsun Chen >Assignee: Kai-Hsun Chen >Priority: Major > > There are two methods to detect executor loss. First, when RPC fails, the > function {{onDisconnected}} will be triggered. Second, when executor exits > with ExecutorExitCode, the exit code will be passed from ExecutorRunner to > Driver. These two methods may categorize same cases into different > conclusions. We hope to categorize the ExecutorLossReason by > ExecutorExitCode. This PR aims to make sure Driver receives ExecutorExitCode > before onDisconnected is called. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39957) Delay onDisconnected to enable Driver receives ExecutorExitCode
[ https://issues.apache.org/jira/browse/SPARK-39957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-39957: - Fix Version/s: 3.4.0 > Delay onDisconnected to enable Driver receives ExecutorExitCode > --- > > Key: SPARK-39957 > URL: https://issues.apache.org/jira/browse/SPARK-39957 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Kai-Hsun Chen >Assignee: Kai-Hsun Chen >Priority: Major > Fix For: 3.4.0 > > > There are two methods to detect executor loss. First, when RPC fails, the > function {{onDisconnected}} will be triggered. Second, when executor exits > with ExecutorExitCode, the exit code will be passed from ExecutorRunner to > Driver. These two methods may categorize same cases into different > conclusions. We hope to categorize the ExecutorLossReason by > ExecutorExitCode. This PR aims to make sure Driver receives ExecutorExitCode > before onDisconnected is called. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34788) Spark throws FileNotFoundException instead of IOException when disk is full
[ https://issues.apache.org/jira/browse/SPARK-34788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572506#comment-17572506 ] wuyi commented on SPARK-34788: -- > Why don't you ensure enough disks from the beginning? On the write side, since Spark usually uses iterators to write data into disk, it's not possible to know the size in advance. On the read side (the error in this ticket is actually raised when reading a file), it can be overkill to me if we'd check if the disk is full each time before reading a file. > This goal of this Jira issue seems to aim `cosmetic` change which has not >much benefit. Yea, this won't fix the root cause but gives a better error msg for users. > Spark throws FileNotFoundException instead of IOException when disk is full > --- > > Key: SPARK-34788 > URL: https://issues.apache.org/jira/browse/SPARK-34788 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: wuyi >Priority: Major > > When the disk is full, Spark throws FileNotFoundException instead of > IOException with the hint. It's quite a confusing error to users: > {code:java} > 9/03/26 09:03:45 ERROR ShuffleBlockFetcherIterator: Failed to create input > stream from local block > java.io.IOException: Error in reading > FileSegmentManagedBuffer{file=/local_disk0/spark-c2f26f02-2572-4764-815a-cbba65ddb315/executor-b4b76a4c-788c-4cb6-b904-664a883be1aa/blockmgr-36804371-24fe-4131-a3dc-00b7f98f3a3e/11/shuffle_113_1029_0.data, > offset=110254956, length=1875458} > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:111) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:442) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622) > at > org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:98) > at > org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:95) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:839) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:839) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) > at org.apache.spark.scheduler.Task.run(Task.scala:112) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at >
[jira] [Commented] (SPARK-34788) Spark throws FileNotFoundException instead of IOException when disk is full
[ https://issues.apache.org/jira/browse/SPARK-34788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572478#comment-17572478 ] wuyi commented on SPARK-34788: -- [~leo wen] are you able to reproduce in your environment? If yes, we could verify the proposal ([http://weblog.janek.org/Archive/2004/12/20/ExceptionWhenWritingToAFu.html] ). > Spark throws FileNotFoundException instead of IOException when disk is full > --- > > Key: SPARK-34788 > URL: https://issues.apache.org/jira/browse/SPARK-34788 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: wuyi >Priority: Major > > When the disk is full, Spark throws FileNotFoundException instead of > IOException with the hint. It's quite a confusing error to users: > {code:java} > 9/03/26 09:03:45 ERROR ShuffleBlockFetcherIterator: Failed to create input > stream from local block > java.io.IOException: Error in reading > FileSegmentManagedBuffer{file=/local_disk0/spark-c2f26f02-2572-4764-815a-cbba65ddb315/executor-b4b76a4c-788c-4cb6-b904-664a883be1aa/blockmgr-36804371-24fe-4131-a3dc-00b7f98f3a3e/11/shuffle_113_1029_0.data, > offset=110254956, length=1875458} > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:111) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:442) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622) > at > org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:98) > at > org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:95) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:839) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:839) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) > at org.apache.spark.scheduler.Task.run(Task.scala:112) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: > /local_disk0/spark-c2f26f02-2572-4764-815a-cbba65ddb315/executor-b4b76a4c-788c-4cb6-b904-664a883be1aa/blockmgr-36804371-24fe-4131-a3dc-00b7f98f3a3e/11/shuffle_113_1029_0.data > (No such file or directory) > at
[jira] [Resolved] (SPARK-39062) Add Standalone backend support for Stage Level Scheduling
[ https://issues.apache.org/jira/browse/SPARK-39062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-39062. -- Fix Version/s: 3.4.0 Assignee: huangtengfei Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/36716 > Add Standalone backend support for Stage Level Scheduling > - > > Key: SPARK-39062 > URL: https://issues.apache.org/jira/browse/SPARK-39062 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: Xingbo Jiang >Assignee: huangtengfei >Priority: Major > Fix For: 3.4.0 > > > We should add Standalone backend support for Stage Level Scheduling: > * The Master should be able to generate executors for multiple > ResouceProfiles, currently it only considers available CPUs; > * The Worker need let the executor know about its ResourceProfile. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32170) Improve the speculation for the inefficient tasks by the task metrics.
Title: Message Title wuyi updated an issue Spark / SPARK-32170 Improve the speculation for the inefficient tasks by the task metrics. Change By: wuyi Fix Version/s: 3.4.0 Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Resolved] (SPARK-32170) Improve the speculation for the inefficient tasks by the task metrics.
Title: Message Title wuyi resolved as Fixed Issue resolved by https://github.com/apache/spark/pull/36162 Spark / SPARK-32170 Improve the speculation for the inefficient tasks by the task metrics. Change By: wuyi Assignee: weixiuli Resolution: Fixed Status: In Progress Resolved Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Resolved] (SPARK-39152) StreamCorruptedException cause job failure for disk persisted RDD
[ https://issues.apache.org/jira/browse/SPARK-39152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-39152. -- Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/36512 > StreamCorruptedException cause job failure for disk persisted RDD > - > > Key: SPARK-39152 > URL: https://issues.apache.org/jira/browse/SPARK-39152 > Project: Spark > Issue Type: Improvement > Components: Block Manager >Affects Versions: 3.4.0 >Reporter: Attila Zsolt Piros >Assignee: Attila Zsolt Piros >Priority: Major > Fix For: 3.4.0 > > > In case of a disk corruption a disk persisted RDD block will lead to job > failure as the block registration is always leads to the same file. So even > when the task is rescheduled on a different executor the job will fail. > *Example* > First failure (the block is locally available): > {noformat} > 22/04/25 07:15:28 ERROR executor.Executor: Exception in task 17024.0 in stage > 12.0 (TID 51853) > java.io.StreamCorruptedException: invalid stream header: > at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943) > at java.io.ObjectInputStream.(ObjectInputStream.java:401) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:617) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:897) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > {noformat} > Then the task might be rescheduled on a different executor but as the block > is registered to the first blockmanager the error will be the same: > {noformat} > java.io.StreamCorruptedException: invalid stream header: > at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943) > at java.io.ObjectInputStream.(ObjectInputStream.java:401) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:698) > at > org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:696) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:696) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:831) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:886) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) > {noformat} > My idea is to retry the IO operations a few times and when all of them failed > deregistering the block and let the following task to recompute it. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-38683) It is unnecessary to release the ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the client channel's connection
[ https://issues.apache.org/jira/browse/SPARK-38683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-38683. -- Fix Version/s: 3.4.0 Assignee: weixiuli Resolution: Fixed Issue resolved by [https://github.com/apache/spark/pull/36000] > It is unnecessary to release the ShuffleManagedBufferIterator or > ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the > client channel's connection is terminated > -- > > Key: SPARK-38683 > URL: https://issues.apache.org/jira/browse/SPARK-38683 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1 >Reporter: weixiuli >Assignee: weixiuli >Priority: Major > Fix For: 3.4.0 > > > It is unnecessary to release the ShuffleManagedBufferIterator or > ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the > client channel's connection is terminated, to reduce I/O operations and > improve performance for the External Shuffle Service. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38683) It is unnecessary to release the ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the client channel's connection
[ https://issues.apache.org/jira/browse/SPARK-38683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-38683: - Issue Type: Improvement (was: Bug) > It is unnecessary to release the ShuffleManagedBufferIterator or > ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the > client channel's connection is terminated > -- > > Key: SPARK-38683 > URL: https://issues.apache.org/jira/browse/SPARK-38683 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1 >Reporter: weixiuli >Priority: Major > > It is unnecessary to release the ShuffleManagedBufferIterator or > ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the > client channel's connection is terminated, to reduce I/O operations and > improve performance for the External Shuffle Service. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38468) Use error classes in org.apache.spark.metrics
[ https://issues.apache.org/jira/browse/SPARK-38468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17506754#comment-17506754 ] wuyi commented on SPARK-38468: -- Shall we close this one? Seem it's duplicate with https://issues.apache.org/jira/browse/SPARK-38312. cc [~bozhang] [~cloud_fan] > Use error classes in org.apache.spark.metrics > - > > Key: SPARK-38468 > URL: https://issues.apache.org/jira/browse/SPARK-38468 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: Bo Zhang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37481) Disappearance of skipped stages mislead the bug hunting
[ https://issues.apache.org/jira/browse/SPARK-37481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17506673#comment-17506673 ] wuyi commented on SPARK-37481: -- Backport fix to 3.1/3.0 also done. > Disappearance of skipped stages mislead the bug hunting > > > Key: SPARK-37481 > URL: https://issues.apache.org/jira/browse/SPARK-37481 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.2, 3.2.0, 3.3.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Fix For: 3.2.1, 3.3.0 > > > # > ## With FetchFailedException and Map Stage Retries > When rerunning spark-sql shell with the original SQL in > [https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c#gistcomment-3977315] > !https://user-images.githubusercontent.com/8326978/143821530-ff498caa-abce-483d-a24b-315aacf7e0a0.png! > 1. stage 3 threw FetchFailedException and caused itself and its parent > stage(stage 2) to retry > 2. stage 2 was skipped before but its attemptId was still 0, so when its > retry happened it got removed from `Skipped Stages` > The DAG of Job 2 doesn't show that stage 2 is skipped anymore. > !https://user-images.githubusercontent.com/8326978/143824666-6390b64a-a45b-4bc8-b05d-c5abbb28cdef.png! > Besides, a retried stage usually has a subset of tasks from the original > stage. If we mark it as an original one, the metrics might lead us into > pitfalls. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-38266) UnresolvedException: Invalid call to dataType on unresolved object caused by GetDateFieldOperations
[ https://issues.apache.org/jira/browse/SPARK-38266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-38266. -- Resolution: Fixed > UnresolvedException: Invalid call to dataType on unresolved object caused by > GetDateFieldOperations > --- > > Key: SPARK-38266 > URL: https://issues.apache.org/jira/browse/SPARK-38266 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0, 3.2 >Reporter: wuyi >Assignee: wuyi >Priority: Major > > {code:java} > test("GetDateFieldOperations should skip unresolved nodes") { > withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { > val df = Seq("1644821603").map(i => (i.toInt, i)).toDF("tsInt", "tsStr") > val df1 = df.select(df("tsStr").cast("timestamp")).as("df1") > val df2 = df.select(df("tsStr").cast("timestamp")).as("df2") > df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer") > val df3 = df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer") > .select($"df1.tsStr".as("timeStr")).as("df3") > // This throws "UnresolvedException: Invalid call to > // dataType on unresolved object" instead of "AnalysisException: Column > 'df1.timeStr' does not exist." > df3.join(df1, year($"df1.timeStr") === year($"df3.tsStr")) > } > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-38266) UnresolvedException: Invalid call to dataType on unresolved object caused by GetDateFieldOperations
[ https://issues.apache.org/jira/browse/SPARK-38266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi reassigned SPARK-38266: Assignee: wuyi > UnresolvedException: Invalid call to dataType on unresolved object caused by > GetDateFieldOperations > --- > > Key: SPARK-38266 > URL: https://issues.apache.org/jira/browse/SPARK-38266 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0, 3.2 >Reporter: wuyi >Assignee: wuyi >Priority: Major > > {code:java} > test("GetDateFieldOperations should skip unresolved nodes") { > withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { > val df = Seq("1644821603").map(i => (i.toInt, i)).toDF("tsInt", "tsStr") > val df1 = df.select(df("tsStr").cast("timestamp")).as("df1") > val df2 = df.select(df("tsStr").cast("timestamp")).as("df2") > df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer") > val df3 = df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer") > .select($"df1.tsStr".as("timeStr")).as("df3") > // This throws "UnresolvedException: Invalid call to > // dataType on unresolved object" instead of "AnalysisException: Column > 'df1.timeStr' does not exist." > df3.join(df1, year($"df1.timeStr") === year($"df3.tsStr")) > } > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38266) UnresolvedException: Invalid call to dataType on unresolved object caused by GetDateFieldOperations
[ https://issues.apache.org/jira/browse/SPARK-38266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495285#comment-17495285 ] wuyi commented on SPARK-38266: -- Issue resolved by https://github.com/apache/spark/pull/35568 > UnresolvedException: Invalid call to dataType on unresolved object caused by > GetDateFieldOperations > --- > > Key: SPARK-38266 > URL: https://issues.apache.org/jira/browse/SPARK-38266 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0, 3.2 >Reporter: wuyi >Priority: Major > > {code:java} > test("GetDateFieldOperations should skip unresolved nodes") { > withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { > val df = Seq("1644821603").map(i => (i.toInt, i)).toDF("tsInt", "tsStr") > val df1 = df.select(df("tsStr").cast("timestamp")).as("df1") > val df2 = df.select(df("tsStr").cast("timestamp")).as("df2") > df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer") > val df3 = df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer") > .select($"df1.tsStr".as("timeStr")).as("df3") > // This throws "UnresolvedException: Invalid call to > // dataType on unresolved object" instead of "AnalysisException: Column > 'df1.timeStr' does not exist." > df3.join(df1, year($"df1.timeStr") === year($"df3.tsStr")) > } > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38266) UnresolvedException: Invalid call to dataType on unresolved object caused by GetDateFieldOperations
wuyi created SPARK-38266: Summary: UnresolvedException: Invalid call to dataType on unresolved object caused by GetDateFieldOperations Key: SPARK-38266 URL: https://issues.apache.org/jira/browse/SPARK-38266 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2, 3.3.0 Reporter: wuyi {code:java} test("GetDateFieldOperations should skip unresolved nodes") { withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { val df = Seq("1644821603").map(i => (i.toInt, i)).toDF("tsInt", "tsStr") val df1 = df.select(df("tsStr").cast("timestamp")).as("df1") val df2 = df.select(df("tsStr").cast("timestamp")).as("df2") df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer") val df3 = df1.join(df2, $"df1.tsStr" === $"df2.tsStr", "left_outer") .select($"df1.tsStr".as("timeStr")).as("df3") // This throws "UnresolvedException: Invalid call to // dataType on unresolved object" instead of "AnalysisException: Column 'df1.timeStr' does not exist." df3.join(df1, year($"df1.timeStr") === year($"df3.tsStr")) } } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-37580) Optimize current TaskSetManager abort logic when task failed count reach the threshold
[ https://issues.apache.org/jira/browse/SPARK-37580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-37580. -- Assignee: wangshengjie Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/34834 > Optimize current TaskSetManager abort logic when task failed count reach the > threshold > -- > > Key: SPARK-37580 > URL: https://issues.apache.org/jira/browse/SPARK-37580 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: wangshengjie >Assignee: wangshengjie >Priority: Major > > In production environment, we found some logic leak about TaskSetManager > abort. For example: > If one task has failed 3 times(max failed threshold is 4 in default), and > there is a retry task and speculative task both in running state, then one of > these 2 task attempts succeed and to cancel another. But executor which task > need to be cancelled lost(oom in our situcation), this task marked as failed, > and TaskSetManager handle this failed task attempt, it has failed 4 times so > abort this stage and cause job failed. > I created the patch for this bug and will soon be sent the pull request. > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-37695) Skip diagnosis ob merged blocks from push-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-37695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-37695. -- Fix Version/s: 3.2.1 3.3.0 Assignee: Cheng Pan Resolution: Fixed Issue resolved by [https://github.com/apache/spark/pull/34961] > Skip diagnosis ob merged blocks from push-based shuffle > --- > > Key: SPARK-37695 > URL: https://issues.apache.org/jira/browse/SPARK-37695 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Assignee: Cheng Pan >Priority: Major > Fix For: 3.2.1, 3.3.0 > > > Shuffle corruption diagnosis for push-based shuffle hasn't been supported > yet. So we should skip diagnosis on merged blocks, otherwise it could fail: > {code:java} > 21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID > 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion > failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0 > at scala.Predef$.assert(Predef.scala:223) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043) > at > org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308) > at scala.Option.map(Option.scala:230) > at > org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307) > at > org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > at java.io.BufferedInputStream.read(BufferedInputStream.java:265) > at java.io.DataInputStream.readInt(DataInputStream.java:387) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.(UnsafeRowSerializer.scala:120) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110) > at > org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98) > at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) > at > org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) > at org.apache.spark.scheduler.Task.run(Task.scala:136) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at
[jira] [Updated] (SPARK-37695) Skip diagnosis ob merged blocks from push-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-37695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-37695: - Description: Shuffle {code:java} 21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0 at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043) at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308) at scala.Option.map(Option.scala:230) at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.(UnsafeRowSerializer.scala:120) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110) at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} was: {code:java} 21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0 at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043) at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308) at scala.Option.map(Option.scala:230) at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307) at
[jira] [Updated] (SPARK-37695) Skip diagnosis ob merged blocks from push-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-37695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-37695: - Description: Shuffle corruption diagnosis for push-based shuffle hasn't been supported yet. So we should skip diagnosis on merged blocks, otherwise it could fail: {code:java} 21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0 at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043) at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308) at scala.Option.map(Option.scala:230) at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.(UnsafeRowSerializer.scala:120) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110) at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} was: Shuffle {code:java} 21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0 at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043) at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308) at scala.Option.map(Option.scala:230) at
[jira] [Created] (SPARK-37695) Skip diagnosis ob merged blocks from push-based shuffle
wuyi created SPARK-37695: Summary: Skip diagnosis ob merged blocks from push-based shuffle Key: SPARK-37695 URL: https://issues.apache.org/jira/browse/SPARK-37695 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.2.0, 3.3.0 Reporter: wuyi {code:java} 21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0 at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043) at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308) at scala.Option.map(Option.scala:230) at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.(UnsafeRowSerializer.scala:120) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110) at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37060) Report driver status does not handle response from backup masters
[ https://issues.apache.org/jira/browse/SPARK-37060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-37060: - Fix Version/s: 3.1.3 3.2.1 > Report driver status does not handle response from backup masters > - > > Key: SPARK-37060 > URL: https://issues.apache.org/jira/browse/SPARK-37060 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0 >Reporter: Mohamadreza Rostami >Assignee: Mohamadreza Rostami >Priority: Critical > Fix For: 3.1.3, 3.2.1, 3.3.0 > > > After an improvement in SPARK-31486, contributor uses > 'asyncSendToMasterAndForwardReply' method instead of > 'activeMasterEndpoint.askSync' to get the status of driver. Since the > driver's status is only available in active master and the > 'asyncSendToMasterAndForwardReply' method iterate over all of the masters, we > have to handle the response from the backup masters in the client, which the > developer did not consider in the SPARK-31486 change. So drivers running in > cluster mode and on a cluster with multi-master affected by this bug. I > created the patch for this bug and will soon be sent the pull request. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-37060) Report driver status does not handle response from backup masters
[ https://issues.apache.org/jira/browse/SPARK-37060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi reassigned SPARK-37060: Assignee: Mohamadreza Rostami > Report driver status does not handle response from backup masters > - > > Key: SPARK-37060 > URL: https://issues.apache.org/jira/browse/SPARK-37060 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0 >Reporter: Mohamadreza Rostami >Assignee: Mohamadreza Rostami >Priority: Critical > > After an improvement in SPARK-31486, contributor uses > 'asyncSendToMasterAndForwardReply' method instead of > 'activeMasterEndpoint.askSync' to get the status of driver. Since the > driver's status is only available in active master and the > 'asyncSendToMasterAndForwardReply' method iterate over all of the masters, we > have to handle the response from the backup masters in the client, which the > developer did not consider in the SPARK-31486 change. So drivers running in > cluster mode and on a cluster with multi-master affected by this bug. I > created the patch for this bug and will soon be sent the pull request. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-37060) Report driver status does not handle response from backup masters
[ https://issues.apache.org/jira/browse/SPARK-37060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-37060. -- Fix Version/s: 3.3.0 Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/34331 > Report driver status does not handle response from backup masters > - > > Key: SPARK-37060 > URL: https://issues.apache.org/jira/browse/SPARK-37060 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0 >Reporter: Mohamadreza Rostami >Assignee: Mohamadreza Rostami >Priority: Critical > Fix For: 3.3.0 > > > After an improvement in SPARK-31486, contributor uses > 'asyncSendToMasterAndForwardReply' method instead of > 'activeMasterEndpoint.askSync' to get the status of driver. Since the > driver's status is only available in active master and the > 'asyncSendToMasterAndForwardReply' method iterate over all of the masters, we > have to handle the response from the backup masters in the client, which the > developer did not consider in the SPARK-31486 change. So drivers running in > cluster mode and on a cluster with multi-master affected by this bug. I > created the patch for this bug and will soon be sent the pull request. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-37300) TaskSchedulerImpl should ignore task finished event if its task was already finished state
[ https://issues.apache.org/jira/browse/SPARK-37300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-37300. -- Fix Version/s: 3.3.0 Assignee: hujiahua Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/34578 > TaskSchedulerImpl should ignore task finished event if its task was already > finished state > -- > > Key: SPARK-37300 > URL: https://issues.apache.org/jira/browse/SPARK-37300 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: hujiahua >Assignee: hujiahua >Priority: Major > Fix For: 3.3.0 > > > `TaskSchedulerImpl` handle task finished event at `handleSuccessfulTask` and > `handleFailedTask` , but in some case the task was already finished state, > which we should ignore task finished event. > Case describe: > when a executor finished a task of some stage, the driver will receive a > StatusUpdate event to handle it. At the same time the driver found the > executor heartbeat timed out, so the dirver also need handle ExecutorLost > event simultaneously. There was a race condition issues here, which will make > TaskSetManager.successful and TaskSetManager.tasksSuccessful wrong result. > More detailed description and discussion can be viewed at > https://issues.apache.org/jira/browse/SPARK-36575 and > https://github.com/apache/spark/pull/33872 -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37481) Disappearance of skipped stages mislead the bug hunting
[ https://issues.apache.org/jira/browse/SPARK-37481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458131#comment-17458131 ] wuyi commented on SPARK-37481: -- Backport fix to 3.1/3.0 is still in progress. > Disappearance of skipped stages mislead the bug hunting > > > Key: SPARK-37481 > URL: https://issues.apache.org/jira/browse/SPARK-37481 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.2, 3.2.0, 3.3.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Fix For: 3.2.0, 3.3.0 > > > # > ## With FetchFailedException and Map Stage Retries > When rerunning spark-sql shell with the original SQL in > [https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c#gistcomment-3977315] > !https://user-images.githubusercontent.com/8326978/143821530-ff498caa-abce-483d-a24b-315aacf7e0a0.png! > 1. stage 3 threw FetchFailedException and caused itself and its parent > stage(stage 2) to retry > 2. stage 2 was skipped before but its attemptId was still 0, so when its > retry happened it got removed from `Skipped Stages` > The DAG of Job 2 doesn't show that stage 2 is skipped anymore. > !https://user-images.githubusercontent.com/8326978/143824666-6390b64a-a45b-4bc8-b05d-c5abbb28cdef.png! > Besides, a retried stage usually has a subset of tasks from the original > stage. If we mark it as an original one, the metrics might lead us into > pitfalls. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-37481) Disappearance of skipped stages mislead the bug hunting
[ https://issues.apache.org/jira/browse/SPARK-37481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-37481. -- Fix Version/s: 3.3.0 3.2.0 Assignee: Kent Yao Resolution: Fixed Issue resolved by https://github.com/apache/spark/pull/34735 > Disappearance of skipped stages mislead the bug hunting > > > Key: SPARK-37481 > URL: https://issues.apache.org/jira/browse/SPARK-37481 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.2, 3.2.0, 3.3.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Fix For: 3.3.0, 3.2.0 > > > # > ## With FetchFailedException and Map Stage Retries > When rerunning spark-sql shell with the original SQL in > [https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c#gistcomment-3977315] > !https://user-images.githubusercontent.com/8326978/143821530-ff498caa-abce-483d-a24b-315aacf7e0a0.png! > 1. stage 3 threw FetchFailedException and caused itself and its parent > stage(stage 2) to retry > 2. stage 2 was skipped before but its attemptId was still 0, so when its > retry happened it got removed from `Skipped Stages` > The DAG of Job 2 doesn't show that stage 2 is skipped anymore. > !https://user-images.githubusercontent.com/8326978/143824666-6390b64a-a45b-4bc8-b05d-c5abbb28cdef.png! > Besides, a retried stage usually has a subset of tasks from the original > stage. If we mark it as an original one, the metrics might lead us into > pitfalls. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36575) Executor lost may cause spark stage to hang
[ https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441796#comment-17441796 ] wuyi commented on SPARK-36575: -- FYI: the fix is reverted due to test issues. > Executor lost may cause spark stage to hang > --- > > Key: SPARK-36575 > URL: https://issues.apache.org/jira/browse/SPARK-36575 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.3.3 >Reporter: hujiahua >Assignee: hujiahua >Priority: Major > Fix For: 3.3.0 > > > When a executor finished a task of some stage, the driver will receive a > `StatusUpdate` event to handle it. At the same time the driver found the > executor heartbeat timed out, so the dirver also need handle ExecutorLost > event simultaneously. There was a race condition issues here, which will make > the task never been rescheduled again and the stage hang over. > The problem is that `TaskResultGetter.enqueueSuccessfulTask` use > asynchronous thread to handle successful task, that mean the synchronized > lock of `TaskSchedulerImpl` was released prematurely during midway > [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61]. > So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous > thread will go on to handle successful task. It cause > `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong > result. > Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, > which make `TaskSchedulerImpl.executorLost` was executed twice. > `copiesRunning(index) -= 1` were processed in `executorLost`, twice > `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. > related log when the issue produce: > 21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: > Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor > 366724, partition 4004, ANY, 7994 bytes) > 21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: > Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after > 140830 ms > 21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost > task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): > ExecutorLostFailure (executor 366724 exited caused by one of the running > tasks) Reason: Executor heartbeat timed out after 140830 ms > 21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished > task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 > (executor 366724) (3047/5400) > 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: > Executor 366724 on 10.109.89.3 killed by driver. > 21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] > ExecutorMonitor: Executor 366724 removed (new total is 793) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417416) > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, > 10.109.89.3, 43402, None) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417416) > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417473) > 21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417473) -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36575) Executor lost may cause spark stage to hang
[ https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi reassigned SPARK-36575: Assignee: hujiahua > Executor lost may cause spark stage to hang > --- > > Key: SPARK-36575 > URL: https://issues.apache.org/jira/browse/SPARK-36575 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.3.3 >Reporter: hujiahua >Assignee: hujiahua >Priority: Major > Fix For: 3.3.0 > > > When a executor finished a task of some stage, the driver will receive a > `StatusUpdate` event to handle it. At the same time the driver found the > executor heartbeat timed out, so the dirver also need handle ExecutorLost > event simultaneously. There was a race condition issues here, which will make > the task never been rescheduled again and the stage hang over. > The problem is that `TaskResultGetter.enqueueSuccessfulTask` use > asynchronous thread to handle successful task, that mean the synchronized > lock of `TaskSchedulerImpl` was released prematurely during midway > [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61]. > So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous > thread will go on to handle successful task. It cause > `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong > result. > Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, > which make `TaskSchedulerImpl.executorLost` was executed twice. > `copiesRunning(index) -= 1` were processed in `executorLost`, twice > `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. > related log when the issue produce: > 21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: > Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor > 366724, partition 4004, ANY, 7994 bytes) > 21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: > Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after > 140830 ms > 21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost > task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): > ExecutorLostFailure (executor 366724 exited caused by one of the running > tasks) Reason: Executor heartbeat timed out after 140830 ms > 21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished > task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 > (executor 366724) (3047/5400) > 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: > Executor 366724 on 10.109.89.3 killed by driver. > 21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] > ExecutorMonitor: Executor 366724 removed (new total is 793) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417416) > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, > 10.109.89.3, 43402, None) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417416) > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417473) > 21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417473) -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36575) Executor lost may cause spark stage to hang
[ https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi reassigned SPARK-36575: Assignee: (was: wuyi) > Executor lost may cause spark stage to hang > --- > > Key: SPARK-36575 > URL: https://issues.apache.org/jira/browse/SPARK-36575 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.3.3 >Reporter: hujiahua >Priority: Major > Fix For: 3.3.0 > > > When a executor finished a task of some stage, the driver will receive a > `StatusUpdate` event to handle it. At the same time the driver found the > executor heartbeat timed out, so the dirver also need handle ExecutorLost > event simultaneously. There was a race condition issues here, which will make > the task never been rescheduled again and the stage hang over. > The problem is that `TaskResultGetter.enqueueSuccessfulTask` use > asynchronous thread to handle successful task, that mean the synchronized > lock of `TaskSchedulerImpl` was released prematurely during midway > [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61]. > So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous > thread will go on to handle successful task. It cause > `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong > result. > Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, > which make `TaskSchedulerImpl.executorLost` was executed twice. > `copiesRunning(index) -= 1` were processed in `executorLost`, twice > `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. > related log when the issue produce: > 21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: > Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor > 366724, partition 4004, ANY, 7994 bytes) > 21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: > Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after > 140830 ms > 21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost > task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): > ExecutorLostFailure (executor 366724 exited caused by one of the running > tasks) Reason: Executor heartbeat timed out after 140830 ms > 21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished > task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 > (executor 366724) (3047/5400) > 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: > Executor 366724 on 10.109.89.3 killed by driver. > 21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] > ExecutorMonitor: Executor 366724 removed (new total is 793) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417416) > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, > 10.109.89.3, 43402, None) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417416) > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417473) > 21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417473) -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36575) Executor lost may cause spark stage to hang
[ https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441486#comment-17441486 ] wuyi commented on SPARK-36575: -- Issue resolved by [https://github.com/apache/spark/pull/33872.] To clarify, the fix doesn't really fix a hang issue but an improvement. The hanging issue doesn't exist in Master branch but only 2.3 (is confirmed) > Executor lost may cause spark stage to hang > --- > > Key: SPARK-36575 > URL: https://issues.apache.org/jira/browse/SPARK-36575 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.3.3 >Reporter: hujiahua >Assignee: wuyi >Priority: Major > > When a executor finished a task of some stage, the driver will receive a > `StatusUpdate` event to handle it. At the same time the driver found the > executor heartbeat timed out, so the dirver also need handle ExecutorLost > event simultaneously. There was a race condition issues here, which will make > the task never been rescheduled again and the stage hang over. > The problem is that `TaskResultGetter.enqueueSuccessfulTask` use > asynchronous thread to handle successful task, that mean the synchronized > lock of `TaskSchedulerImpl` was released prematurely during midway > [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61]. > So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous > thread will go on to handle successful task. It cause > `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong > result. > Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, > which make `TaskSchedulerImpl.executorLost` was executed twice. > `copiesRunning(index) -= 1` were processed in `executorLost`, twice > `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. > related log when the issue produce: > 21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: > Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor > 366724, partition 4004, ANY, 7994 bytes) > 21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: > Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after > 140830 ms > 21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost > task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): > ExecutorLostFailure (executor 366724 exited caused by one of the running > tasks) Reason: Executor heartbeat timed out after 140830 ms > 21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished > task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 > (executor 366724) (3047/5400) > 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: > Executor 366724 on 10.109.89.3 killed by driver. > 21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] > ExecutorMonitor: Executor 366724 removed (new total is 793) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417416) > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, > 10.109.89.3, 43402, None) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417416) > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417473) > 21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417473) -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36575) Executor lost may cause spark stage to hang
[ https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-36575: - Issue Type: Improvement (was: Bug) > Executor lost may cause spark stage to hang > --- > > Key: SPARK-36575 > URL: https://issues.apache.org/jira/browse/SPARK-36575 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.3.3 >Reporter: hujiahua >Assignee: wuyi >Priority: Major > > When a executor finished a task of some stage, the driver will receive a > `StatusUpdate` event to handle it. At the same time the driver found the > executor heartbeat timed out, so the dirver also need handle ExecutorLost > event simultaneously. There was a race condition issues here, which will make > the task never been rescheduled again and the stage hang over. > The problem is that `TaskResultGetter.enqueueSuccessfulTask` use > asynchronous thread to handle successful task, that mean the synchronized > lock of `TaskSchedulerImpl` was released prematurely during midway > [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61]. > So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous > thread will go on to handle successful task. It cause > `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong > result. > Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, > which make `TaskSchedulerImpl.executorLost` was executed twice. > `copiesRunning(index) -= 1` were processed in `executorLost`, twice > `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. > related log when the issue produce: > 21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: > Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor > 366724, partition 4004, ANY, 7994 bytes) > 21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: > Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after > 140830 ms > 21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost > task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): > ExecutorLostFailure (executor 366724 exited caused by one of the running > tasks) Reason: Executor heartbeat timed out after 140830 ms > 21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished > task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 > (executor 366724) (3047/5400) > 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: > Executor 366724 on 10.109.89.3 killed by driver. > 21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] > ExecutorMonitor: Executor 366724 removed (new total is 793) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417416) > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, > 10.109.89.3, 43402, None) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417416) > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417473) > 21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417473) -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36575) Executor lost may cause spark stage to hang
[ https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-36575: - Fix Version/s: 3.3.0 > Executor lost may cause spark stage to hang > --- > > Key: SPARK-36575 > URL: https://issues.apache.org/jira/browse/SPARK-36575 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.3.3 >Reporter: hujiahua >Assignee: wuyi >Priority: Major > Fix For: 3.3.0 > > > When a executor finished a task of some stage, the driver will receive a > `StatusUpdate` event to handle it. At the same time the driver found the > executor heartbeat timed out, so the dirver also need handle ExecutorLost > event simultaneously. There was a race condition issues here, which will make > the task never been rescheduled again and the stage hang over. > The problem is that `TaskResultGetter.enqueueSuccessfulTask` use > asynchronous thread to handle successful task, that mean the synchronized > lock of `TaskSchedulerImpl` was released prematurely during midway > [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61]. > So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous > thread will go on to handle successful task. It cause > `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong > result. > Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, > which make `TaskSchedulerImpl.executorLost` was executed twice. > `copiesRunning(index) -= 1` were processed in `executorLost`, twice > `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. > related log when the issue produce: > 21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: > Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor > 366724, partition 4004, ANY, 7994 bytes) > 21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: > Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after > 140830 ms > 21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost > task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): > ExecutorLostFailure (executor 366724 exited caused by one of the running > tasks) Reason: Executor heartbeat timed out after 140830 ms > 21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished > task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 > (executor 366724) (3047/5400) > 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: > Executor 366724 on 10.109.89.3 killed by driver. > 21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] > ExecutorMonitor: Executor 366724 removed (new total is 793) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417416) > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, > 10.109.89.3, 43402, None) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417416) > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417473) > 21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417473) -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36575) Executor lost may cause spark stage to hang
[ https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi reassigned SPARK-36575: Assignee: wuyi > Executor lost may cause spark stage to hang > --- > > Key: SPARK-36575 > URL: https://issues.apache.org/jira/browse/SPARK-36575 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.3.3 >Reporter: hujiahua >Assignee: wuyi >Priority: Major > > When a executor finished a task of some stage, the driver will receive a > `StatusUpdate` event to handle it. At the same time the driver found the > executor heartbeat timed out, so the dirver also need handle ExecutorLost > event simultaneously. There was a race condition issues here, which will make > the task never been rescheduled again and the stage hang over. > The problem is that `TaskResultGetter.enqueueSuccessfulTask` use > asynchronous thread to handle successful task, that mean the synchronized > lock of `TaskSchedulerImpl` was released prematurely during midway > [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61]. > So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous > thread will go on to handle successful task. It cause > `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong > result. > Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, > which make `TaskSchedulerImpl.executorLost` was executed twice. > `copiesRunning(index) -= 1` were processed in `executorLost`, twice > `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. > related log when the issue produce: > 21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: > Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor > 366724, partition 4004, ANY, 7994 bytes) > 21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: > Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after > 140830 ms > 21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost > task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): > ExecutorLostFailure (executor 366724 exited caused by one of the running > tasks) Reason: Executor heartbeat timed out after 140830 ms > 21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished > task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 > (executor 366724) (3047/5400) > 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: > Executor 366724 on 10.109.89.3 killed by driver. > 21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] > ExecutorMonitor: Executor 366724 removed (new total is 793) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417416) > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, > 10.109.89.3, 43402, None) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417416) > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417473) > 21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417473) -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-36575) Executor lost may cause spark stage to hang
[ https://issues.apache.org/jira/browse/SPARK-36575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-36575. -- Resolution: Fixed > Executor lost may cause spark stage to hang > --- > > Key: SPARK-36575 > URL: https://issues.apache.org/jira/browse/SPARK-36575 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.3.3 >Reporter: hujiahua >Priority: Major > > When a executor finished a task of some stage, the driver will receive a > `StatusUpdate` event to handle it. At the same time the driver found the > executor heartbeat timed out, so the dirver also need handle ExecutorLost > event simultaneously. There was a race condition issues here, which will make > the task never been rescheduled again and the stage hang over. > The problem is that `TaskResultGetter.enqueueSuccessfulTask` use > asynchronous thread to handle successful task, that mean the synchronized > lock of `TaskSchedulerImpl` was released prematurely during midway > [https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L61]. > So `TaskSchedulerImpl` may handle executorLost first, then the asynchronous > thread will go on to handle successful task. It cause > `TaskSetManager.successful` and `TaskSetManager.tasksSuccessful` wrong > result. > Then `HeartbeatReceiver.expireDeadHosts` executed `killAndReplaceExecutor`, > which make `TaskSchedulerImpl.executorLost` was executed twice. > `copiesRunning(index) -= 1` were processed in `executorLost`, twice > `executorLost` made `copiesRunning(index)` to -1, which lead stage to hang. > related log when the issue produce: > 21/08/05 02:58:14,784 INFO [dispatcher-event-loop-8] TaskSetManager: > Starting task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor > 366724, partition 4004, ANY, 7994 bytes) > 21/08/05 03:00:24,126 ERROR [dispatcher-event-loop-4] TaskSchedulerImpl: > Lost executor 366724 on 10.109.89.3: Executor heartbeat timed out after > 140830 ms > 21/08/05 03:00:24,218 WARN [dispatcher-event-loop-4] TaskSetManager: Lost > task 4004.0 in stage 1328625.0 (TID 347212402, 10.109.89.3, executor 366724): > ExecutorLostFailure (executor 366724 exited caused by one of the running > tasks) Reason: Executor heartbeat timed out after 140830 ms > 21/08/05 03:00:24,542 INFO [task-result-getter-2] TaskSetManager: Finished > task 4004.0 in stage 1328625.0 (TID 347212402) in 129758 ms on 10.109.89.3 > (executor 366724) (3047/5400) > 21/08/05 03:00:34,621 INFO [dispatcher-event-loop-8] TaskSchedulerImpl: > Executor 366724 on 10.109.89.3 killed by driver. > 21/08/05 03:00:34,771 INFO [spark-listener-group-executorManagement] > ExecutorMonitor: Executor 366724 removed (new total is 793) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417416) > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:42,360 INFO [dispatcher-event-loop-14] > BlockManagerMasterEndpoint: Removing block manager BlockManagerId(366724, > 10.109.89.3, 43402, None) > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:42,360 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417416) > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Executor > lost: 366724 (epoch 417473) > 21/08/05 03:00:44,584 INFO [dispatcher-event-loop-15] > BlockManagerMasterEndpoint: Trying to remove executor 366724 from > BlockManagerMaster. > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] BlockManagerMaster: > Removed 366724 successfully in removeExecutor > 21/08/05 03:00:44,584 INFO [dag-scheduler-event-loop] DAGScheduler: Shuffle > files lost for executor: 366724 (epoch 417473) -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17421971#comment-17421971 ] wuyi commented on SPARK-18105: -- [~vladimir.prus] Hi, could you also file a sub-task under https://issues.apache.org/jira/browse/SPARK-20624? That sounds like an issue in decommission. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager
[ https://issues.apache.org/jira/browse/SPARK-36700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-36700. -- Fix Version/s: 3.3.0 3.0.4 3.1.3 3.2.0 Assignee: wuyi Resolution: Fixed > BlockManager re-registration is broken due to deferred removal of > BlockManager > --- > > Key: SPARK-36700 > URL: https://issues.apache.org/jira/browse/SPARK-36700 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0 >Reporter: wuyi >Assignee: wuyi >Priority: Blocker > Fix For: 3.2.0, 3.1.3, 3.0.4, 3.3.0 > > > Due to the deferred removal of BlockManager (introduced in SPARK-35011), an > expected BlockManager re-registration could be refused as the inactive > BlockManager still exists in the map `blockManagerInfo`: > https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager
[ https://issues.apache.org/jira/browse/SPARK-36700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413865#comment-17413865 ] wuyi commented on SPARK-36700: -- Reverted by [https://github.com/apache/spark/pull/33942] and backported to 3.2, 3.1, 3.0. > BlockManager re-registration is broken due to deferred removal of > BlockManager > --- > > Key: SPARK-36700 > URL: https://issues.apache.org/jira/browse/SPARK-36700 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > Due to the deferred removal of BlockManager (introduced in SPARK-35011), an > expected BlockManager re-registration could be refused as the inactive > BlockManager still exists in the map `blockManagerInfo`: > https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager
[ https://issues.apache.org/jira/browse/SPARK-36700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412290#comment-17412290 ] wuyi commented on SPARK-36700: -- I'm working on the fix. > BlockManager re-registration is broken due to deferred removal of > BlockManager > --- > > Key: SPARK-36700 > URL: https://issues.apache.org/jira/browse/SPARK-36700 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > Due to the deferred removal of BlockManager (introduced in SPARK-35011), an > expected BlockManager re-registration could be refused as the inactive > BlockManager still exists in the map `blockManagerInfo`: > https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager
[ https://issues.apache.org/jira/browse/SPARK-36700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412285#comment-17412285 ] wuyi commented on SPARK-36700: -- cc [~mridulm80] [~sumeet.gajjar] [~attilapiros] cc [~gengliang] for the blocker fyi > BlockManager re-registration is broken due to deferred removal of > BlockManager > --- > > Key: SPARK-36700 > URL: https://issues.apache.org/jira/browse/SPARK-36700 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > Due to the deferred removal of BlockManager (introduced in SPARK-35011), an > expected BlockManager re-registration could be refused as the inactive > BlockManager still exists in the map `blockManagerInfo`: > https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36700) BlockManager re-registration is broken due to deferred removal of BlockManager
wuyi created SPARK-36700: Summary: BlockManager re-registration is broken due to deferred removal of BlockManager Key: SPARK-36700 URL: https://issues.apache.org/jira/browse/SPARK-36700 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.2, 3.0.3, 3.2.0, 3.3.0 Reporter: wuyi Due to the deferred removal of BlockManager (introduced in SPARK-35011), an expected BlockManager re-registration could be refused as the inactive BlockManager still exists in the map `blockManagerInfo`: https://github.com/apache/spark/blob/9cefde8db373a3433b7e3ce328e4a2ce83b1aca2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L551 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36614) Executor loss reason shows "worker lost" rather "Executor decommission"
wuyi created SPARK-36614: Summary: Executor loss reason shows "worker lost" rather "Executor decommission" Key: SPARK-36614 URL: https://issues.apache.org/jira/browse/SPARK-36614 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 3.1.2, 3.2.0, 3.3.0 Reporter: wuyi Attachments: WeChat13c9f1345a096ff83d193e4e9853b165.png For a lost executor caused by decommission, the loss reason shows "worker lost", while it is actually due to decommission. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36614) Executor loss reason shows "worker lost" rather "Executor decommission"
[ https://issues.apache.org/jira/browse/SPARK-36614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-36614: - Attachment: WeChat13c9f1345a096ff83d193e4e9853b165.png > Executor loss reason shows "worker lost" rather "Executor decommission" > --- > > Key: SPARK-36614 > URL: https://issues.apache.org/jira/browse/SPARK-36614 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.1.2, 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Major > Attachments: WeChat13c9f1345a096ff83d193e4e9853b165.png > > > For a lost executor caused by decommission, the loss reason shows "worker > lost", while it is actually due to decommission. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406622#comment-17406622 ] wuyi edited comment on SPARK-18105 at 8/30/21, 8:31 AM: FYI, for users who hit the "Stream is corrupted" error in 3.0.0, please try to apply the fix of SPARK-32658 first as it's a known bug that can cause the error. was (Author: ngone51): FYI, for users who hit the "Stream is corrupted" error, please try to apply the fix of SPARK-32658 first as it's a known bug that can cause the error. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406622#comment-17406622 ] wuyi commented on SPARK-18105: -- FYI, for users who hit the "Stream is corrupted" error, please try to apply the fix of SPARK-32658 first as it's a known bug that can cause the error. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36196) Spark FetchFailedException Stream is corrupted Error
[ https://issues.apache.org/jira/browse/SPARK-36196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406620#comment-17406620 ] wuyi commented on SPARK-36196: -- Hi [~arghya18] Did you try to apply the fix of https://issues.apache.org/jira/browse/SPARK-32658? > Spark FetchFailedException Stream is corrupted Error > > > Key: SPARK-36196 > URL: https://issues.apache.org/jira/browse/SPARK-36196 > Project: Spark > Issue Type: Bug > Components: Kubernetes, PySpark, Spark Core >Affects Versions: 3.1.1 > Environment: Spark on K8s >Reporter: Arghya Saha >Priority: Major > > I am running Spark on K8S. There are around thousands of jobs runs everyday > but few are getting failed everyday(not same job) and with below exception. > It succeed on retry. I have read about the error in multiple Jira and saw its > resolved with Spark 3.0.0 but I am still getting the error with higher > version. > {code:java} > org.apache.spark.shuffle.FetchFailedException: Stream is corrupted > org.apache.spark.shuffle.FetchFailedException: Stream is corrupted at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:770) > at > org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845) > at java.base/java.io.BufferedInputStream.fill(Unknown Source) at > java.base/java.io.BufferedInputStream.read1(Unknown Source) at > java.base/java.io.BufferedInputStream.read(Unknown Source) at > java.base/java.io.DataInputStream.read(Unknown Source) at > org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899) at > org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733) at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:494) at > scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.sort_addToSorter_0$(Unknown > Source) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) > at > org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:817) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.(SortMergeJoinExec.scala:687) > at > org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:197) > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.run(Task.scala:131) at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) at java.base/java.lang.Thread.run(Unknown Source)Caused by: > java.io.IOException: Stream is corrupted at > net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:250) at > net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at > org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:841) > ... 38 moreCaused by: net.jpountz.lz4.LZ4Exception: Error decoding offset
[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403556#comment-17403556 ] wuyi commented on SPARK-36558: -- Discussed with [~vsowrirajan] offline. The issue won't happen in the production code since the "runningStages" is synced within the single DAGSchedulerEventProcessLoop' thread. Close as won't fix. > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted. And submitting a stage > with no tasks to run would not be able to add its child stage into the > waiting stage list, which leads to the job hang in the end. > > The example to reproduce: > First, change `MyRDD` to allow it to compute: > {code:java} > override def compute(split: Partition, context: TaskContext): Iterator[(Int, > Int)] = { > Iterator.single((1, 1)) > }{code} > Then run this test: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts = 20 > val shuffleMapRdd = new MyRDD(sc, parts, Nil) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(parts)) > val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = > mapOutputTracker) > reduceRdd1.countAsync() > latch.await() > // scalastyle:off > println("=after wait==") > // set _shuffleMergedFinalized to true can avoid the hang. > // shuffleDep._shuffleMergedFinalized = true > val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) > reduceRdd2.count() > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-36558. -- Resolution: Won't Fix > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted. And submitting a stage > with no tasks to run would not be able to add its child stage into the > waiting stage list, which leads to the job hang in the end. > > The example to reproduce: > First, change `MyRDD` to allow it to compute: > {code:java} > override def compute(split: Partition, context: TaskContext): Iterator[(Int, > Int)] = { > Iterator.single((1, 1)) > }{code} > Then run this test: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts = 20 > val shuffleMapRdd = new MyRDD(sc, parts, Nil) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(parts)) > val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = > mapOutputTracker) > reduceRdd1.countAsync() > latch.await() > // scalastyle:off > println("=after wait==") > // set _shuffleMergedFinalized to true can avoid the hang. > // shuffleDep._shuffleMergedFinalized = true > val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) > reduceRdd2.count() > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-36558: - Description: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end. The example to reproduce: First, change `MyRDD` to allow it to compute: {code:java} override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = { Iterator.single((1, 1)) }{code} Then run this test: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // scalastyle:off println("=after wait==") // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} was: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end. The example to reproduce: First, change `MyRDD` to allow it compute: override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = \{ Iterator.single((1, 1)) } Then run this test: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // scalastyle:off println("=after wait==") // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} > Stage has all tasks finished but with ongoing finalization can cause job hang >
[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-36558: - Description: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end. The example to reproduce: First, change `MyRDD` to allow it compute: override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = \{ Iterator.single((1, 1)) } Then run this test: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // scalastyle:off println("=after wait==") // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} was: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end. The example to reproduce: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // scalastyle:off println("=after wait==") // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 >
[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-36558: - Description: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end. The example to reproduce: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // scalastyle:off println("=after wait==") // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} was: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end. The example to reproduce: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing
[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403456#comment-17403456 ] wuyi commented on SPARK-36558: -- [~vsowrirajan] Sorry, I missed one tweaked change in `MyRDD`. We should allow `MyRDD` to compute: {code:java} override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = { Iterator.single((1, 1)) }{code} Could you help verify it again? > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted. And submitting a stage > with no tasks to run would not be able to add its child stage into the > waiting stage list, which leads to the job hang in the end. > > The example to reproduce: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts = 20 > val shuffleMapRdd = new MyRDD(sc, parts, Nil) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(parts)) > val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = > mapOutputTracker) > reduceRdd1.countAsync() > latch.await() > // set _shuffleMergedFinalized to true can avoid the hang. > // shuffleDep._shuffleMergedFinalized = true > val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) > reduceRdd2.count() > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36564) LiveRDDDistribution.toApi throws NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-36564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-36564: - Summary: LiveRDDDistribution.toApi throws NullPointerException (was: LiveRDD.doUpdate throws NullPointerException) > LiveRDDDistribution.toApi throws NullPointerException > - > > Key: SPARK-36564 > URL: https://issues.apache.org/jira/browse/SPARK-36564 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Major > > {code:java} > 21/08/23 12:26:29 ERROR AsyncEventQueue: Listener AppStatusListener threw an > exception > java.lang.NullPointerException > at > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:192) > at > com.google.common.collect.MapMakerInternalMap.putIfAbsent(MapMakerInternalMap.java:3507) > at > com.google.common.collect.Interners$WeakInterner.intern(Interners.java:85) > at > org.apache.spark.status.LiveEntityHelpers$.weakIntern(LiveEntity.scala:696) > at > org.apache.spark.status.LiveRDDDistribution.toApi(LiveEntity.scala:563) > at > org.apache.spark.status.LiveRDD.$anonfun$doUpdate$4(LiveEntity.scala:629) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at > scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) > at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at org.apache.spark.status.LiveRDD.doUpdate(LiveEntity.scala:629) > at org.apache.spark.status.LiveEntity.write(LiveEntity.scala:51) > at > org.apache.spark.status.AppStatusListener.update(AppStatusListener.scala:1206) > at > org.apache.spark.status.AppStatusListener.maybeUpdate(AppStatusListener.scala:1212) > at > org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6(AppStatusListener.scala:956) > at > org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6$adapted(AppStatusListener.scala:956) > at > scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) > at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) > at > org.apache.spark.status.AppStatusListener.flush(AppStatusListener.scala:1015) > at > org.apache.spark.status.AppStatusListener.onExecutorMetricsUpdate(AppStatusListener.scala:956) > at > org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:59) > at > org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) > at > org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) > at > org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) > at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:119) > at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:103) > at > org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) > at > org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) > at > scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) > at > org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) > at > org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1585) > at > org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36564) LiveRDD.doUpdate throws NullPointerException
wuyi created SPARK-36564: Summary: LiveRDD.doUpdate throws NullPointerException Key: SPARK-36564 URL: https://issues.apache.org/jira/browse/SPARK-36564 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.2, 3.0.3, 3.2.0, 3.3.0 Reporter: wuyi {code:java} 21/08/23 12:26:29 ERROR AsyncEventQueue: Listener AppStatusListener threw an exception java.lang.NullPointerException at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:192) at com.google.common.collect.MapMakerInternalMap.putIfAbsent(MapMakerInternalMap.java:3507) at com.google.common.collect.Interners$WeakInterner.intern(Interners.java:85) at org.apache.spark.status.LiveEntityHelpers$.weakIntern(LiveEntity.scala:696) at org.apache.spark.status.LiveRDDDistribution.toApi(LiveEntity.scala:563) at org.apache.spark.status.LiveRDD.$anonfun$doUpdate$4(LiveEntity.scala:629) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.status.LiveRDD.doUpdate(LiveEntity.scala:629) at org.apache.spark.status.LiveEntity.write(LiveEntity.scala:51) at org.apache.spark.status.AppStatusListener.update(AppStatusListener.scala:1206) at org.apache.spark.status.AppStatusListener.maybeUpdate(AppStatusListener.scala:1212) at org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6(AppStatusListener.scala:956) at org.apache.spark.status.AppStatusListener.$anonfun$onExecutorMetricsUpdate$6$adapted(AppStatusListener.scala:956) at scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) at org.apache.spark.status.AppStatusListener.flush(AppStatusListener.scala:1015) at org.apache.spark.status.AppStatusListener.onExecutorMetricsUpdate(AppStatusListener.scala:956) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:59) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:119) at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:103) at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1585) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-36558: - Description: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end. The example to reproduce: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} was: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add child stage into the waiting stage list, which leads to the job hang in the end. The example to reproduce: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is
[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-36558: - Description: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add child stage into the waiting stage list, which leads to the job hang in the end. The example to reproduce: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} was: For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted, which leads to the job hang in the end. The example to reproduce: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see >
[jira] [Commented] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402942#comment-17402942 ] wuyi commented on SPARK-36558: -- cc [~mridul] [~mshen] [~csingh] > Stage has all tasks finished but with ongoing finalization can cause job hang > - > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted, which leads to the job > hang in the end. > > The example to reproduce: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts = 20 > val shuffleMapRdd = new MyRDD(sc, parts, Nil) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(parts)) > val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = > mapOutputTracker) > reduceRdd1.countAsync() > latch.await() > // set _shuffleMergedFinalized to true can avoid the hang. > // shuffleDep._shuffleMergedFinalized = true > val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) > reduceRdd2.count() > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang
wuyi created SPARK-36558: Summary: Stage has all tasks finished but with ongoing finalization can cause job hang Key: SPARK-36558 URL: https://issues.apache.org/jira/browse/SPARK-36558 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 3.2.0, 3.3.0 Reporter: wuyi For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted, which leads to the job hang in the end. The example to reproduce: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36543) Decommission logs too frequent when waiting migration to finish
wuyi created SPARK-36543: Summary: Decommission logs too frequent when waiting migration to finish Key: SPARK-36543 URL: https://issues.apache.org/jira/browse/SPARK-36543 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 3.1.0, 3.2.0, 3.3.0 Reporter: wuyi {code:java} 21/08/18 08:14:31 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:31 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:31 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:32 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:32 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:32 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:33 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:33 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:33 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:34 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:34 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:34 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:35 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:35 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:35 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:36 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:36 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:36 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:37 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:37 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:37 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:38 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:38 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:38 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:39 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:39 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:39 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:40 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:40 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:40 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:41 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:41 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:41 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:42 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:42 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:42 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:43 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:43 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:43 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. 21/08/18 08:14:44 INFO CoarseGrainedExecutorBackend: Checking to see if we can shutdown. 21/08/18 08:14:44 INFO CoarseGrainedExecutorBackend: No running tasks, checking migrations 21/08/18 08:14:44 INFO CoarseGrainedExecutorBackend: All blocks not yet migrated. ...{code} It takes some time to migrate data (shuffle or rdd). Logging per second is too frequent. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36532) Deadlock in CoarseGrainedExecutorBackend.onDisconnected
wuyi created SPARK-36532: Summary: Deadlock in CoarseGrainedExecutorBackend.onDisconnected Key: SPARK-36532 URL: https://issues.apache.org/jira/browse/SPARK-36532 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0 Reporter: wuyi The deadlock has the exactly same root cause as SPARK-14180 but just happens in a different code path. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle
[ https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi resolved SPARK-36530. -- Resolution: Won't Fix > Avoid finalizing when there's no push at all in a shuffle > - > > Key: SPARK-36530 > URL: https://issues.apache.org/jira/browse/SPARK-36530 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Major > > When all partition data of a map output is bigger than > spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map > outputs don't have push, the shuffle doesn't have push. In that case, we > don't need to launch the finalizing request. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle
[ https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400118#comment-17400118 ] wuyi commented on SPARK-36530: -- SGTM. Could you update SPARK-33701 to include this part? Then, I'll close this one. > Avoid finalizing when there's no push at all in a shuffle > - > > Key: SPARK-36530 > URL: https://issues.apache.org/jira/browse/SPARK-36530 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Major > > When all partition data of a map output is bigger than > spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map > outputs don't have push, the shuffle doesn't have push. In that case, we > don't need to launch the finalizing request. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36530) Avoid finalizing when there's no push at all in a shuffle
[ https://issues.apache.org/jira/browse/SPARK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400089#comment-17400089 ] wuyi commented on SPARK-36530: -- cc [~mridulm80] [~mshen] any thoughts? > Avoid finalizing when there's no push at all in a shuffle > - > > Key: SPARK-36530 > URL: https://issues.apache.org/jira/browse/SPARK-36530 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.2.0, 3.3.0 >Reporter: wuyi >Priority: Major > > When all partition data of a map output is bigger than > spark.shuffle.push.maxBlockSizeToPush, there will be no push. When all map > outputs don't have push, the shuffle doesn't have push. In that case, we > don't need to launch the finalizing request. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org