zsxwing commented on a change in pull request #30242:
URL: https://github.com/apache/spark/pull/30242#discussion_r530075736



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
##########
@@ -137,3 +139,47 @@ trait EvalPythonExec extends UnaryExecNode {
     }
   }
 }
+
+/**
+ * A TaskContext aware iterator.
+ *
+ * As the Python evaluation consumes the parent iterator in a separate thread,
+ * it could consume more data from the parent even after the task ends and the 
parent is closed.
+ * Thus, we should use ContextAwareIterator to stop consuming after the task 
ends.
+ */
+class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) 
extends Iterator[IN] {
+
+  val thread = new AtomicReference[Thread]()
+
+  if (iter.hasNext) {
+    val failed = new AtomicBoolean(false)
+
+    context.addTaskFailureListener { (_, _) =>
+      failed.set(true)
+    }
+
+    context.addTaskCompletionListener[Unit] { _ =>
+      var thread = this.thread.get()
+
+      while (thread == null && !failed.get()) {
+        // Wait for a while since the writer thread might not reach to 
consuming the iterator yet.
+        context.wait(10)

Review comment:
       I didn't realize it. It's better to not rely on this in a listener. This 
is something we should consider to improve in future. It's a bad idea to hold 
an implicit lock when calling user's listener because it's pretty easy to cause 
surprising deadlock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to