BryanCutler commented on a change in pull request #25515: [SPARK-27659][PYTHON]
Allow PySpark to prefetch during toLocalIterator
URL: https://github.com/apache/spark/pull/25515#discussion_r316896955
##########
File path: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
##########
@@ -196,10 +204,15 @@ private[spark] object PythonRDD extends Logging {
// Read request for data, value of zero will stop iteration or
non-zero to continue
if (in.readInt() == 0) {
complete = true
- } else if (collectPartitionIter.hasNext) {
+ } else if (prefetchIter.hasNext) {
// Client requested more data, attempt to collect the next
partition
- val partitionArray = collectPartitionIter.next()
+ val partitionFuture = prefetchIter.next()
+ // Cause the next job to be submitted if prefecthPartitions is
enabled.
+ if (prefetchPartitions) {
+ prefetchIter.headOption
+ }
+ val partitionArray = ThreadUtils.awaitResult(partitionFuture,
Duration.Inf)
Review comment:
It might be best to avoid `awaitResult` if possible. Could you make a
buffered iterator yourself?
maybe something like
```scala
var next = collectPartitionIter.next()
val prefetchIter = collectPartitionIter.map { part =>
val tmp = next
next = part
tmp
} ++ Iterator(next)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]