Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16989#discussion_r117170463
  
    --- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
    @@ -126,4 +150,50 @@ private void failRemainingBlocks(String[] 
failedBlockIds, Throwable e) {
           }
         }
       }
    +
    +  private class DownloadCallback implements StreamCallback {
    +
    +    private WritableByteChannel channel = null;
    +    private File targetFile = null;
    +    private int chunkIndex;
    +
    +    public DownloadCallback(File targetFile, int chunkIndex) throws 
IOException {
    +      this.targetFile = targetFile;
    +      this.channel = Channels.newChannel(new FileOutputStream(targetFile));
    +      this.chunkIndex = chunkIndex;
    +    }
    +
    +    @Override
    +    public void onData(String streamId, ByteBuffer buf) throws IOException 
{
    +      channel.write(buf);
    +    }
    +
    +    @Override
    +    public void onComplete(String streamId) throws IOException {
    +      channel.close();
    +      ManagedBuffer buffer = new FileSegmentManagedBuffer(
    +        transportConf, targetFile, 0, targetFile.length()) {
    +        @Override
    +        public ManagedBuffer release() {
    --- End diff --
    
    Is `release()` guaranteed to be called? I noticed that the `targetFile` 
name is determined by the higher-level code inside of the Spark task itself, so 
it seems like at that point we know the names of the files which should be 
cleaned up and could register them with a task completion callback to ensure 
that they're guaranteed to be deleted after the task finishes. This would 
provide another layer of defensiveness against disk file leaks.
    
    Aside: this is an example of why we need some TaskContext method for 
allocating task-scoped temporary file names: it would greatly simplify our 
story around temporary files like sort spills, etc. Alternatively we could use 
a temp directory per task which holds all of that task's temporary files and 
just wipe away the entire directory at the end of the task. This is a larger 
change that I'm proposing which should probably be done in a separate PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to