Github user ericl commented on a diff in the pull request:
https://github.com/apache/spark/pull/14854#discussion_r76841791
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1331,6 +1333,129 @@ abstract class RDD[T: ClassTag](
}
}
+ private[spark] def takeOnline[R: ClassTag](
+ num: Int,
+ unpackPartition: Array[T] => Iterator[R]): Array[R] = withScope {
+ require(num >= 0)
+ val totalPartitions = partitions.length
+ var partitionsScanned = 0
+ var gotEnoughRows: Boolean = num == 0
+ val lock = new Object()
+ // This buffer accumulates the rows to be returned.
+ val resultToReturn = new ArrayBuffer[R]
+ // In order to preserve the behavior of the old `take()`
implementation, it's important that
+ // we process partitions in order of their partition ids. Partitions
may be computed out of
+ // order. Once we have received all partitions up to partition N then
we can perform driver-side
+ // processing on partitions 1 through N to determine whether we've
received enough items.
+
+ // This bitset tracks which partitions have been computed. We don't
have to worry about
+ // recomputations or speculative tasks because the DAGScheduler
ensures that our result handler
+ // will only be called once per partition.
+ val partitionStatuses = new java.util.BitSet(totalPartitions)
+ // We'll use the convention that a set bit denotes the _absence_ of a
partition's result.
+ // This is done in order to let us call `nextSetBit()` to find the
"frontier" between the prefix
+ // of partitions that have been computed and the first outstanding
partition.
+ partitionStatuses.set(0, totalPartitions - 1, true)
+
+ // The "frontier" is the the largest partitionId such that all earlier
partitions have
+ // been computed. If all partitions have been computed, the frontier
is -1.
+ var frontier: Int = 0
+
+ // Because partitions may arrive out-of-order, we need to partitions'
output until we have
+ // received all preceding partitions.
+ val bufferedPartitions = new mutable.HashMap[Int, Array[T]]() // key
is partition id
+
+ // This callback is invoked as individual partitions complete.
+ def handleResult(partitionId: Int, result: Array[T]): Unit =
lock.synchronized {
+ if (gotEnoughRows) {
+ logDebug(s"Ignoring result for partition $partitionId of $this
because we have enough rows")
+ } else {
+ logDebug(s"Handling result for partition $partitionId of $this")
+ // Mark the partition as completed and buffer the result in case
we can't process it now.
+ partitionStatuses.clear(partitionId)
+ bufferedPartitions(partitionId) = result
+ // Determine whether we can process buffered results, which we can
only do if the frontier
+ // partition was received (since frontier is always the first
outstanding partition)
+ assert(frontier != -1)
+ if (partitionId == frontier) {
+ frontier = partitionStatuses.nextSetBit(0)
+ val partitionsIter = bufferedPartitions
+ .keySet.filter { pid => pid < frontier || frontier == -1
}.toSeq.sorted.iterator
+ while (!gotEnoughRows && partitionsIter.hasNext) {
+ val partitionToUnpack = partitionsIter.next()
+ logDebug(s"Unpacking partition $partitionToUnpack of $this")
+ val rawPartitionData =
bufferedPartitions.remove(partitionToUnpack).get
+ resultToReturn ++= unpackPartition(rawPartitionData)
+ if (resultToReturn.size >= num) {
+ // We have unpacked enough results to reach the desired
number of results, so discard
+ // any remaining partitions' data:
+ bufferedPartitions.clear()
+ // Set a flag so that future task completion events are
ignored:
+ gotEnoughRows = true
+ // Notify the main thread so that it can interrupt the job:
+ lock.notifyAll()
--- End diff --
Makes sense. Then you could probably just call `jobFuture.result` in that
loop.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]