advancedxy commented on a change in pull request #23638: [SPARK-26713][CORE]
Interrupt pipe IO threads in PipedRDD when task is finished
URL: https://github.com/apache/spark/pull/23638#discussion_r250855785
##########
File path: core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
##########
@@ -156,7 +157,32 @@ private[spark] class PipedRDD[T: ClassTag](
out.close()
}
}
- }.start()
+ }
+ stdinWriterThread.start()
+
+ // interrupts stdin writer and stderr read threads when the corresponding
task is finished as a
+ // safe belt. Otherwise, these threads could outlive the task's lifetime.
For example:
+ // val pipeRDD = sc.range(1, 100).pipe(Seq("cat"))
+ // val abnormalRDD = pipeRDD.mapPartitions(_ => Iterator.empty)
+ // the iterator generated by PipedRDD is never involved. If the parent
RDD's iterator is time
+ // consuming to generate(ShuffledRDD's shuffle operation for example), the
outlived stdin writer
+ // thread will consume significant memory and cpu time. Also, there's race
condition for
Review comment:
This is a tricky one. After the fix in this pr to
`ShuffleBlockFetcherIterator`, the race condition should be really rare as only
one potential next call may hang. However I am unable to find a good place to
put the above comment in `ShuffleBlockFetcherIterator`. So it's inside this
`task completion listener`.
Do you have any suggestion?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]