srowen commented on a change in pull request #23638: [SPARK-26713][CORE]
Interrupt pipe IO threads in PipedRDD when task is finished
URL: https://github.com/apache/spark/pull/23638#discussion_r250835149
##########
File path: core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
##########
@@ -156,7 +157,32 @@ private[spark] class PipedRDD[T: ClassTag](
out.close()
}
}
- }.start()
+ }
+ stdinWriterThread.start()
+
+ // interrupts stdin writer and stderr read threads when the corresponding
task is finished as a
+ // safe belt. Otherwise, these threads could outlive the task's lifetime.
For example:
+ // val pipeRDD = sc.range(1, 100).pipe(Seq("cat"))
+ // val abnormalRDD = pipeRDD.mapPartitions(_ => Iterator.empty)
+ // the iterator generated by PipedRDD is never involved. If the parent
RDD's iterator is time
+ // consuming to generate(ShuffledRDD's shuffle operation for example), the
outlived stdin writer
+ // thread will consume significant memory and cpu time. Also, there's race
condition for
Review comment:
Does the second part of the comment, beginning at "Also,", belong below in
the change to `ShuffleBlockFetcherIterator`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]