Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-28 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2650064663


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   ping @cloud-fan WHYT? any more suggestions for this issue?



##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   ping @cloud-fan WDYT? any more suggestions for this issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-25 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2646986092


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   Yea,  unless we reconsider (https://github.com/apache/spark/pull/36056), the 
current code is the best we can do now, and writing to absolute path (via 
newTaskTempFileAbsPath)  should be relatively few scenarios in prod.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-25 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2646878201


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   to summary: this is a best effort, we may still have disk leak under certain 
cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-25 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2646780439


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   > do we need to call `deleteOnExit` at the executor side against 
`stagingDir`?
   
   We can't do this, since executor may exit after task finished. And commit 
job sometimes execute after corresponding executor exit. If do this, may cause 
loss temp task written data file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-25 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2646766809


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   do we need to call `deleteOnExit` at the executor side against `stagingDir`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-24 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2645184591


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   `stagingDir ` was created at:
   
   1. when `dynamicPartitionOverwrite=true`, call 
`committer.setupJob(jobContext)` in driver side.
   2. Writing to absolute path (via newTaskTempFileAbsPath) in executor side.
   
   So check existence of stagingDir is same as  check 
`dynamicPartitionOverwrite = true` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-24 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2645178562


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   oh, how about check existence of `stagingDir`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-24 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2645139000


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   You mean pass `WriteJobDescription` to `FileCommitProtocol`'s `setupJob`? 
This behavior will change the protocol...I don't think it's worth it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-24 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2645129712


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   or we check if output file is an absolute path when 
`dynamicPartitionOverwrite=false`, before we call `deleteOnExit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-24 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2645107472


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   `stagingDir` is used in the following situations: 
   
   - When there is an output file with an absolute path (via 
`newTaskTempFileAbsPath`), it is unrelated to `dynamicPartitionOverwrite`.
   - When `dynamicPartitionOverwrite=true`, it is used for the final movement 
of partition files.
   
   When `dynamicPartitionOverwrite=false`, stagingDir won't created as 
outputPath after `committer.setupJob(jobContext)`. so here call 
`fs.deleteOnExit(stagingDir)` may throw exception since 
   deleteOnExit() will check file exists.
   ```
 public boolean deleteOnExit(Path f) throws IOException {
   if (!this.exists(f)) {
   return false;
   } else {
   synchronized(this.deleteOnExit) {
   this.deleteOnExit.add(f);
   return true;
   }
   }
   }
   ```
   
   When `stagingDir` using for absolute path (via `newTaskTempFileAbsPath`), it 
was created by executor and we can't call `fs.deleteOnExit(stagingDir)` since 
this file was committer in driver.
   
   
   So after current PR, `stagingDir` used for absolute path (via 
`newTaskTempFileAbsPath`) still may remain staging path. **(This kind of 
scenario is usually very rare.)**
   
   What we can do is we can first create `stagingDr` here and add it to 
`deleteOnExit` path. Then call cover all case.
   
   And this path always delete after `commitJob` or `abortJob`
   https://github.com/user-attachments/assets/e76da80b-50cb-413d-8498-d35e3655c29e";
 />
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-24 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2645107472


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   `stagingDir` is used in the following situations: 
   
   - When there is an output file with an absolute path (via 
`newTaskTempFileAbsPath`), it is unrelated to `dynamicPartitionOverwrite`.
   - When `dynamicPartitionOverwrite=true`, it is used for the final movement 
of partition files.
   
   When `dynamicPartitionOverwrite=false`, stagingDir won't created as 
outputPath after `committer.setupJob(jobContext)`. so here call 
`fs.deleteOnExit(stagingDir)` may throw exception since 
   deleteOnExit() will check file exists.
   ```
 public boolean deleteOnExit(Path f) throws IOException {
   if (!this.exists(f)) {
   return false;
   } else {
   synchronized(this.deleteOnExit) {
   this.deleteOnExit.add(f);
   return true;
   }
   }
   }
   ```
   
   When `stagingDir` using for absolute path (via `newTaskTempFileAbsPath`), it 
was created by executor and we can't call `fs.deleteOnExit(stagingDir)` since 
this file was committer in driver.
   
   
   So after current PR, `stagingDir` used for absolute path (via 
`newTaskTempFileAbsPath`) still may remain staging path.
   
   What we can do is we can first create `stagingDr` here and add it to 
`deleteOnExit` path. Then call cover all case.
   
   And this path always delete after commitJob or abortJob
   
   
   



##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   `stagingDir` is used in the following situations: 
   
   - When there is an output file with an absolute path (via 
`newTaskTempFileAbsPath`), it is unrelated to `dynamicPartitionOverwrite`.
   - When `dynamicPartitionOverwrite=true`, it is used for the final movement 
of partition files.
   
   When `dynamicPartitionOverwrite=false`, stagingDir won't created as 
outputPath after `committer.setupJob(jobContext)`. so here call 
`fs.deleteOnExit(stagingDir)` may throw exception since 
   deleteOnExit() will check file exists.
   ```
 public boolean deleteOnExit(Path f) throws IOException {
   if (!this.exists(f)) {
   return false;
   } else {
   synchronized(this.deleteOnExit) {
   this.deleteOnExit.add(f);
   return true;
   }
   }
   }
   ```
   
   When `stagingDir` using for absolute path (via `newTaskTempFileAbsPath`), it 
was created by executor and we can't call `fs.deleteOnExit(stagingDir)` since 
this file was committer in driver.
   
   
   So after current PR, `stagingDir` used for absolute path (via 
`newTaskTempFileAbsPath`) still may remain staging path.
   
   What we can do is we can first create `stagingDr` here and add it to 
`deleteOnExit` path. Then call cover all case.
   
   And this path always delete after `commitJob` or `abortJob`
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-23 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2644753399


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   can you explain the full life cycle of `stagingDir`? I don't understand why 
`if (dynamicPartitionOverwrite)` is needed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-23 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2643567604


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   you mean all tempOutputPath? normal temp path always been deleted when net 
time written to same table, always not a problem。



##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   you mean all tempOutputPath? normal temp path always been deleted when next 
time written to same table, always not a problem。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-23 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2643567604


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   you mean all tempOutputPath? tnormal temp path always been deleted when net 
time written to same table, always not a problem。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-23 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2643493069


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   shall we always mark the staging dir as `deleteOnExit`? In which cases Spark 
will not delete the stage dir after commit/abort?



##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   shall we always mark the staging dir as `deleteOnExit`? In which cases Spark 
does not delete the stage dir after commit/abort?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-23 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2643189360


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   yea, should we add an condition here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-23 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2643042957


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,19 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {

Review Comment:
   looking at `commitJob` or `abortJob`, we remove the staging directory even 
if `dynamicPartitionOverwrite` is not true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-23 Thread via GitHub


AngersZh commented on PR #53406:
URL: https://github.com/apache/spark/pull/53406#issuecomment-3686178992

   Failed GA rerun passed,  should I re-trigger all GA rerun again? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-23 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2642337363


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,15 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {
+val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+fs.deleteOnExit(stagingDir)

Review Comment:
   Done



##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -229,6 +238,7 @@ class HadoopMapReduceCommitProtocol(
   }
 
   fs.delete(stagingDir, true)
+  fs.cancelDeleteOnExit(stagingDir)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-22 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2642300188


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,15 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  if (dynamicPartitionOverwrite) {
+val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+fs.deleteOnExit(stagingDir)

Review Comment:
   let's add code comment to explain why this is necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-22 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2642298701


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -229,6 +238,7 @@ class HadoopMapReduceCommitProtocol(
   }
 
   fs.delete(stagingDir, true)
+  fs.cancelDeleteOnExit(stagingDir)

Review Comment:
   let's add a code comment to explain it. It's not intuitive that we need to 
cancel it to avoid memory leak, even if the directory has been deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-22 Thread via GitHub


AngersZh commented on PR #53406:
URL: https://github.com/apache/spark/pull/53406#issuecomment-3684821458

   gentle ping @dongjoon-hyun @cloud-fan @mridulm Any more suggestions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-19 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2634268672


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -229,6 +238,7 @@ class HadoopMapReduceCommitProtocol(
   }
 
   fs.delete(stagingDir, true)
+  fs.cancelDeleteOnExit(stagingDir)

Review Comment:
   Seems not as your thought



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-19 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2634253053


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -229,6 +238,7 @@ class HadoopMapReduceCommitProtocol(
   }
 
   fs.delete(stagingDir, true)
+  fs.cancelDeleteOnExit(stagingDir)

Review Comment:
   I thought it's implied when `stagingDir` is deleted, it's not the case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-18 Thread via GitHub


AngersZh commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2633323195


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,13 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+  fs.deleteOnExit(stagingDir)

Review Comment:
   How about current? ping @dongjoon-hyun @mridulm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-11 Thread via GitHub


mridulm commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2611624445


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,13 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+  fs.deleteOnExit(stagingDir)

Review Comment:
   For streaming jobs, given their really long duration and large number of 
commits, we will keep accumulating the `deleteOnExit` metadata to be tracked - 
and eventually OOM.
   I am assuming that is what @dongjoon-hyun is referring to - and I agree it 
is a risk.
   
   At a minimum, we can derisk it by calling `cancelDeleteOnExit` for 
successful commits.
   
   Btw, please do not catch `Throwable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-11 Thread via GitHub


mridulm commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2611624445


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,13 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+  fs.deleteOnExit(stagingDir)

Review Comment:
   For streaming jobs, given their really long duration and large number of 
commits, we will keep accumulating the `deleteOnExit` metadata to be tracked - 
and eventually OOM.
   I am assuming that is what @dongjoon-hyun is referring to - and I agree it 
is a risk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-11 Thread via GitHub


mridulm commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2611624445


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,13 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+  fs.deleteOnExit(stagingDir)

Review Comment:
   For streaming jobs, given their really long duration and large number of 
commits, we will keep accumulating the `deleteOnExit` stage - and eventually 
OOM.
   I am assuming that is what @dongjoon-hyun is referring to - and I agree it 
is a risk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-11 Thread via GitHub


cloud-fan commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2610029917


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,13 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+  fs.deleteOnExit(stagingDir)

Review Comment:
   I don't think streaming job need to look at file source staging dir when 
restarting, this is not state store or checkpoint, cc @HeartSaVioR 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-10 Thread via GitHub


dongjoon-hyun commented on code in PR #53406:
URL: https://github.com/apache/spark/pull/53406#discussion_r2607700611


##
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##
@@ -178,6 +178,13 @@ class HadoopMapReduceCommitProtocol(
 val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
 committer = setupCommitter(taskAttemptContext)
 committer.setupJob(jobContext)
+try {
+  val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+  fs.deleteOnExit(stagingDir)

Review Comment:
   For the streaming Spark jobs, this increases indefinitely, doesnt' it, 
@AngersZh ?
   
   IMO, we had better consider this kind of behavior change as `Improvement` 
instead of `Bug` to avoid any side-effect. WDYT?
   
   cc @mridulm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-54651][CORE] Driver should delete temporary files when exiting during commit process [spark]

2025-12-09 Thread via GitHub


AngersZh commented on PR #53406:
URL: https://github.com/apache/spark/pull/53406#issuecomment-3630909802

   Gentle. ping @cloud-fan @HyukjinKwon @dongjoon-hyun Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]