Ngone51 commented on a change in pull request #24699: [SPARK-27666][CORE] Do
not release lock while TaskContext already completed
URL: https://github.com/apache/spark/pull/24699#discussion_r288832190
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -992,12 +992,20 @@ private[spark] class BlockManager(
/**
* Release a lock on the given block with explicit TID.
- * The param `taskAttemptId` should be passed in case we can't get the
correct TID from
- * TaskContext, for example, the input iterator of a cached RDD iterates to
the end in a child
+ * The param `taskContext` should be passed in case we can't get the correct
TaskContext
+ * for example, the input iterator of a cached RDD iterates to the end in a
child
* thread.
*/
- def releaseLock(blockId: BlockId, taskAttemptId: Option[Long] = None): Unit
= {
- blockInfoManager.unlock(blockId, taskAttemptId)
+ def releaseLock(blockId: BlockId, taskContext: Option[TaskContext] = None):
Unit = {
+ val taskAttemptId = taskContext.map(_.taskAttemptId())
+ // SPARK-27666. Child thread spawned from task thread could produce race
condition
+ // on block lock releasing. We should prevent child thread from releasing
un-locked
+ // block when task thread has already finished.
+ if (taskContext.isDefined && taskContext.map(_.isCompleted()).get) {
+ logWarning(s"Task $taskAttemptId already completed, not releasing lock
for $blockId")
Review comment:
Thanks for catching this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]