xuanyuanking commented on code in PR #49413:
URL: https://github.com/apache/spark/pull/49413#discussion_r1911781043
##########
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala:
##########
@@ -712,6 +713,140 @@ class JobCancellationSuite extends SparkFunSuite with
Matchers with BeforeAndAft
assert(executionOfInterruptibleCounter.get() < numElements)
}
+ Seq(true, false).foreach { interruptible =>
+
+ val (hint1, hint2) = if (interruptible) {
+ (" not", "")
+ } else {
+ ("", " not")
+ }
+
+ val testName = s"SPARK-50768:$hint1 use
TaskContext.createResourceUninterruptibly " +
+ s"would$hint2 cause stream leak on task interruption"
+
+ test(testName) {
+ import org.apache.spark.JobCancellationSuite._
+ withTempDir { dir =>
+
+ // `InterruptionSensitiveInputStream` is designed to easily leak the
underlying stream
Review Comment:
It would be great to mention why InterruptionSensitiveInputStream can easily
lead to stream leaks and the key conditions, such as creating internal
resources during initialize, as highlighted in the PR description.
##########
core/src/main/scala/org/apache/spark/TaskContext.scala:
##########
@@ -305,4 +305,20 @@ abstract class TaskContext extends Serializable {
/** Gets local properties set upstream in the driver. */
private[spark] def getLocalProperties: Properties
+
+
+ /** Whether the current task is allowed to interrupt. */
+ private[spark] def interruptible(): Boolean
+
+ /**
+ * Pending the interruption request until the task is able to
+ * interrupt after creating the resource uninterruptibly.
+ */
+ private[spark] def pendingInterrupt(threadToInterrupt: Option[Thread],
reason: String): Unit
+
+ /**
+ * Creating a closeable resource uninterruptibly. A task is not allowed to
interrupt in this
+ * state until the resource creation finishes.
+ */
+ private[spark] def createResourceUninterruptibly[T <:
Closeable](resourceBuilder: => T): T
Review Comment:
I noticed that the description mentions a follow-up to apply this function.
Could you briefly explain how this method is invoked or provide a simple
example?
##########
core/src/main/scala/org/apache/spark/TaskContext.scala:
##########
@@ -305,4 +305,20 @@ abstract class TaskContext extends Serializable {
/** Gets local properties set upstream in the driver. */
private[spark] def getLocalProperties: Properties
+
+
+ /** Whether the current task is allowed to interrupt. */
+ private[spark] def interruptible(): Boolean
+
+ /**
+ * Pending the interruption request until the task is able to
Review Comment:
code style nit
##########
core/src/main/scala/org/apache/spark/TaskContext.scala:
##########
@@ -305,4 +305,20 @@ abstract class TaskContext extends Serializable {
/** Gets local properties set upstream in the driver. */
private[spark] def getLocalProperties: Properties
+
Review Comment:
nit: extra line?
##########
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala:
##########
@@ -712,6 +713,140 @@ class JobCancellationSuite extends SparkFunSuite with
Matchers with BeforeAndAft
assert(executionOfInterruptibleCounter.get() < numElements)
}
+ Seq(true, false).foreach { interruptible =>
+
+ val (hint1, hint2) = if (interruptible) {
+ (" not", "")
+ } else {
+ ("", " not")
+ }
+
+ val testName = s"SPARK-50768:$hint1 use
TaskContext.createResourceUninterruptibly " +
+ s"would$hint2 cause stream leak on task interruption"
+
+ test(testName) {
+ import org.apache.spark.JobCancellationSuite._
+ withTempDir { dir =>
+
+ // `InterruptionSensitiveInputStream` is designed to easily leak the
underlying stream
+ // when task thread interruption happens during its initialization.
+ class InterruptionSensitiveInputStream(fileHint: String) extends
InputStream {
+ private var underlying: InputStream = _
+
+ def initialize(): InputStream = {
+ val in: InputStream = new InputStream {
+
+ open()
+
+ private def dumpFile(typeName: String): Unit = {
+ var fileOut: FileOutputStream = null
+ var objOut: ObjectOutputStream = null
+ try {
+ val file = new File(dir, s"$typeName.$fileHint")
+ fileOut = new FileOutputStream(file)
+ objOut = new ObjectOutputStream(fileOut)
+ objOut.writeBoolean(true)
+ objOut.flush()
+ } finally {
+ if (fileOut != null) {
+ fileOut.close()
+ }
+ if (objOut != null) {
+ objOut.close()
+ }
+ }
+
+ }
+
+ private def open(): Unit = {
+ dumpFile("open")
+ }
+
+ override def close(): Unit = {
+ dumpFile("close")
+ }
+
+ override def read(): Int = -1
+ }
+
+ // Leave some time for the task to be interrupted during the
+ // creation of `InterruptionSensitiveInputStream`.
+ Thread.sleep(5000)
Review Comment:
How important is this sleep within the task? Could it potentially make the
test flaky?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]