JoshRosen commented on PR #42742:
URL: https://github.com/apache/spark/pull/42742#issuecomment-1701720571

   > So it seems freeing the memory is not a problem, but if we return the 
InterruptedException, we would still be risking leaking the direct buffers, 
since we won't get a chance to register this task completion listener ? Do you 
think this is safe/handled elsewhere in the caller when the exception is 
received ?
   
   If I understand correctly, I think this might be a pre-existing risk that 
we're making worse: there's nothing that prevented the old code from throwing 
arbitrary exceptions when computing the iterator elements.
   
   I wonder whether we should aim to fix that pre-existing bug at a higher 
level. In `putIteratorAsBytes`, we have 
   
   
https://github.com/apache/spark/blob/e72ce91250a9a2c40fd5ed55a50dbc46e4e7e46d/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L349-L367
   
   I'm wondering whether we can restructure that code in order to wrap the 
`putIterator` call and dispose of the `valuesHolder` in case `putIterator` 
fails. Something along these lines (borrowing some code and comments from 
elsewhere in this part of Spark):
   
   ```scala
      val putIteratorResult = Utils.tryWithSafeFinallyAndFailureCallbacks {
         putIterator(blockId, values, classTag, memoryMode, valuesHolder)
       }(catchBlock = {
         // We want to close the output stream in order to free any resources 
associated with the
         // serializer itself (such as Kryo's internal buffers). close() might 
cause data to be
         // written, so redirect the output stream to discard that data.
         
valuesHolder.redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream())
         valuesHolder.serializationStream.close()
         valuesHolder.bbos.close()
         valuesHolder.bbos.toChunkedByteBuffer.dispose()
       })
   
       putIteratorResult match {
         case Right(storedSize) => Right(storedSize)
         case Left(unrollMemoryUsedByThisBlock) =>
           Left(new PartiallySerializedBlock(
             this,
             serializerManager,
             blockId,
             valuesHolder.serializationStream,
             valuesHolder.redirectableStream,
             unrollMemoryUsedByThisBlock,
             memoryMode,
             valuesHolder.bbos,
             values,
             classTag))
       }
   ```
   
   If the `putIterator()` call fails then the `catchBlock` will try to close 
the serialization stream and dispose of the block. I used 
`tryWithSafeFinallyAndFailureCallbacks` because there's non-trivial cleanup 
work taking place in the catch block and I didn't want that to suppress the 
original task exception.
   
   I scoped the `try` block to exclude the `new PartiallySerializedBlock` 
because I wanted to avoid the possibility that two different pieces of cleanup 
logic (the task completion callback and the catch block) both call 
`toChunkedByteBuffer()`.
   
   ---
   
   As I look further into the pre-existing code, I'm spotting a couple of other 
cases where it looks like we're not guaranteed to perform proper cleanup. For 
example, it looks like we're not guaranteed to close the serialization stream 
if downstream partial unrolling code fails (or at least it's not 
straightforwardly obvious that cleanup will happen).
   
   To better test those cases, I think we should add some new unit test cases 
to `MemoryStoreSuite` to test scenarios where the iterator being stored throws 
exceptions at various points.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to