BryanCutler commented on a change in pull request #25515: [SPARK-27659][PYTHON]
Allow PySpark to prefetch during toLocalIterator
URL: https://github.com/apache/spark/pull/25515#discussion_r317755713
##########
File path: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
##########
@@ -196,10 +204,15 @@ private[spark] object PythonRDD extends Logging {
// Read request for data, value of zero will stop iteration or
non-zero to continue
if (in.readInt() == 0) {
complete = true
- } else if (collectPartitionIter.hasNext) {
+ } else if (prefetchIter.hasNext) {
// Client requested more data, attempt to collect the next
partition
- val partitionArray = collectPartitionIter.next()
+ val partitionFuture = prefetchIter.next()
+ // Cause the next job to be submitted if prefecthPartitions is
enabled.
+ if (prefetchPartitions) {
+ prefetchIter.headOption
+ }
+ val partitionArray = ThreadUtils.awaitResult(partitionFuture,
Duration.Inf)
Review comment:
Ah yes, you are totally right. That would block while getting the prefetched
partition. This looks pretty good to me then.
One question though, when should the first job be triggered? I think the old
behavior used to start the first job as soon as `toLocalIterator()` was called.
From what I can tell, this will wait until the first iteration and then trigger
the first 2 jobs. Either way is probably fine, but you might get slightly
better performance by starting the first job immediately.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]