HyukjinKwon commented on a change in pull request #30177:
URL: https://github.com/apache/spark/pull/30177#discussion_r513898999



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
##########
@@ -137,3 +138,18 @@ trait EvalPythonExec extends UnaryExecNode {
     }
   }
 }
+
+/**
+ * A TaskContext aware iterator.
+ *
+ * As the Python evaluation consumes the parent iterator in a separate thread,
+ * it could consume more data from the parent even after the task ends and the 
parent is closed.
+ * Thus, we should use ContextAwareIterator to stop consuming after the task 
ends.
+ */
+class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) 
extends Iterator[IN] {
+
+  override def hasNext: Boolean =
+    !context.isCompleted() && !context.isInterrupted() && iter.hasNext

Review comment:
       BTW, one thing I would like to note that this is not a clean shot.
   
   This is rather a bandaid fix because the consumption in the iterator is 
async-ed from the main task thread. So, the close can happen at any point in 
the upstream, e.g. in the middle of `hasNext`, and it still can cause the same 
issue.
   
   To completely fix this, IMHO, we should sync completely. Then there's no 
point of having a separate thread to process Python UDFs.
   
   I think the cause is basically similar with that `input_file_name` due to 
un-sync between this thread and main thread (see 
https://github.com/apache/spark/pull/24958#issuecomment-511364075).
   
   If there's a better option, it'd be great but I think this fix is good 
enough (given that I see similar approach in `ContinuousQueuedDataReader`).
   
   Let me know if I missed something here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to