Ngone51 commented on code in PR #49413:
URL: https://github.com/apache/spark/pull/49413#discussion_r1913378870


##########
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala:
##########
@@ -712,6 +713,140 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
     assert(executionOfInterruptibleCounter.get() < numElements)
  }
 
+  Seq(true, false).foreach { interruptible =>
+
+    val (hint1, hint2) = if (interruptible) {
+      (" not", "")
+    } else {
+      ("", " not")
+    }
+
+    val testName = s"SPARK-50768:$hint1 use 
TaskContext.createResourceUninterruptibly " +
+      s"would$hint2 cause stream leak on task interruption"
+
+    test(testName) {
+      import org.apache.spark.JobCancellationSuite._
+      withTempDir { dir =>
+
+        // `InterruptionSensitiveInputStream` is designed to easily leak the 
underlying stream
+        // when task thread interruption happens during its initialization.
+        class InterruptionSensitiveInputStream(fileHint: String) extends 
InputStream {
+          private var underlying: InputStream = _
+
+          def initialize(): InputStream = {
+            val in: InputStream = new InputStream {
+
+              open()
+
+              private def dumpFile(typeName: String): Unit = {
+                var fileOut: FileOutputStream = null
+                var objOut: ObjectOutputStream = null
+                try {
+                  val file = new File(dir, s"$typeName.$fileHint")
+                  fileOut = new FileOutputStream(file)
+                  objOut = new ObjectOutputStream(fileOut)
+                  objOut.writeBoolean(true)
+                  objOut.flush()
+                } finally {
+                  if (fileOut != null) {
+                    fileOut.close()
+                  }
+                  if (objOut != null) {
+                    objOut.close()
+                  }
+                }
+
+              }
+
+              private def open(): Unit = {
+                dumpFile("open")
+              }
+
+              override def close(): Unit = {
+                dumpFile("close")
+              }
+
+              override def read(): Int = -1
+            }
+
+            // Leave some time for the task to be interrupted during the
+            // creation of `InterruptionSensitiveInputStream`.
+            Thread.sleep(5000)

Review Comment:
   It is necessary to ensure the task is interrupted during 
`InterruptionSensitiveInputStream#initialize()` so that we can test the leaked 
stream. Increase the sleep time should less likely be flaky.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to