JoshRosen commented on PR #42742:
URL: https://github.com/apache/spark/pull/42742#issuecomment-1701830082
> If we want to add the catch block on the caller, I think we need to do it
for putIteratorAsValues as well ?
I don't think we need it there: `putIteratorAsValues` stores deserialized
values, so there's no off-heap memory to be freed or serialization streams to
be closed.
> Do you think its ok to split this issue into 2 parts since I think the
other change might need some more work ? For the current PR, I'll just modify
the return value to throw the exception and then file a follow-up JIRA ticket
for the cleanup related work ?
As a compromise, what do you think about keeping this as-is (i.e. not
throwing `InterruptedException`) and instead add a single
`taskContext.killTaskIfInterrupted` call right before we return from
`putIteratorAsBytes`? e.g.
```scala
val res = putIteratorResult match {
case Right(storedSize) => Right(storedSize)
case Left(unrollMemoryUsedByThisBlock) =>
Left(new PartiallySerializedBlock(
this,
serializerManager,
blockId,
valuesHolder.serializationStream,
valuesHolder.redirectableStream,
unrollMemoryUsedByThisBlock,
memoryMode,
valuesHolder.bbos,
values,
classTag))
}
Option(TaskContext.get()).foreach(_.killTaskIfInterrupted())
res
```
This approach will ensure that the bytebuffer cleanup logic is run (because
the task completion callback will have been registered) but also ensures that
we'll exit in a timely manner rather than trying to continue processing the
rest of the task's rows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]