HyukjinKwon commented on a change in pull request #30177:
URL: https://github.com/apache/spark/pull/30177#discussion_r513898999
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
##########
@@ -137,3 +138,18 @@ trait EvalPythonExec extends UnaryExecNode {
}
}
}
+
+/**
+ * A TaskContext aware iterator.
+ *
+ * As the Python evaluation consumes the parent iterator in a separate thread,
+ * it could consume more data from the parent even after the task ends and the
parent is closed.
+ * Thus, we should use ContextAwareIterator to stop consuming after the task
ends.
+ */
+class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext)
extends Iterator[IN] {
+
+ override def hasNext: Boolean =
+ !context.isCompleted() && !context.isInterrupted() && iter.hasNext
Review comment:
BTW, one thing I would like to note that this is not a clean shot.
This is rather a bandaid fix because the consumption in the iterator is
async-ed from the main task thread. So, the close can happen at any point in
the upstream, e.g. in the middle of `hasNext`, and it still can cause the same
issue.
To completely fix this, IMHO, we should sync completely. Then there's no
point of having a separate thread to process Python UDFs.
I think the cause is basically similar with that `input_file_name` due to
un-sync between this thread and main thread (see
https://github.com/apache/spark/pull/24958#issuecomment-511364075).
If there's a better option, it'd be great but I think this fix is good
enough (given that I see similar approach in `ContinuousQueuedDataReader`).
Let me know if I missed something here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]