Ngone51 commented on code in PR #49413:
URL: https://github.com/apache/spark/pull/49413#discussion_r1913378870
##########
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala:
##########
@@ -712,6 +713,140 @@ class JobCancellationSuite extends SparkFunSuite with
Matchers with BeforeAndAft
assert(executionOfInterruptibleCounter.get() < numElements)
}
+ Seq(true, false).foreach { interruptible =>
+
+ val (hint1, hint2) = if (interruptible) {
+ (" not", "")
+ } else {
+ ("", " not")
+ }
+
+ val testName = s"SPARK-50768:$hint1 use
TaskContext.createResourceUninterruptibly " +
+ s"would$hint2 cause stream leak on task interruption"
+
+ test(testName) {
+ import org.apache.spark.JobCancellationSuite._
+ withTempDir { dir =>
+
+ // `InterruptionSensitiveInputStream` is designed to easily leak the
underlying stream
+ // when task thread interruption happens during its initialization.
+ class InterruptionSensitiveInputStream(fileHint: String) extends
InputStream {
+ private var underlying: InputStream = _
+
+ def initialize(): InputStream = {
+ val in: InputStream = new InputStream {
+
+ open()
+
+ private def dumpFile(typeName: String): Unit = {
+ var fileOut: FileOutputStream = null
+ var objOut: ObjectOutputStream = null
+ try {
+ val file = new File(dir, s"$typeName.$fileHint")
+ fileOut = new FileOutputStream(file)
+ objOut = new ObjectOutputStream(fileOut)
+ objOut.writeBoolean(true)
+ objOut.flush()
+ } finally {
+ if (fileOut != null) {
+ fileOut.close()
+ }
+ if (objOut != null) {
+ objOut.close()
+ }
+ }
+
+ }
+
+ private def open(): Unit = {
+ dumpFile("open")
+ }
+
+ override def close(): Unit = {
+ dumpFile("close")
+ }
+
+ override def read(): Int = -1
+ }
+
+ // Leave some time for the task to be interrupted during the
+ // creation of `InterruptionSensitiveInputStream`.
+ Thread.sleep(5000)
Review Comment:
It is necessary to ensure the task is interrupted during
`InterruptionSensitiveInputStream#initialize()` so that we can test the leaked
stream. Increase the sleep time should less likely be flaky.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]