Github user ericl commented on a diff in the pull request:
https://github.com/apache/spark/pull/14854#discussion_r76842149
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1331,6 +1333,129 @@ abstract class RDD[T: ClassTag](
}
}
+ private[spark] def takeOnline[R: ClassTag](
+ num: Int,
+ unpackPartition: Array[T] => Iterator[R]): Array[R] = withScope {
+ require(num >= 0)
+ val totalPartitions = partitions.length
+ var partitionsScanned = 0
+ var gotEnoughRows: Boolean = num == 0
+ val lock = new Object()
+ // This buffer accumulates the rows to be returned.
+ val resultToReturn = new ArrayBuffer[R]
+ // In order to preserve the behavior of the old `take()`
implementation, it's important that
+ // we process partitions in order of their partition ids. Partitions
may be computed out of
+ // order. Once we have received all partitions up to partition N then
we can perform driver-side
+ // processing on partitions 1 through N to determine whether we've
received enough items.
+
+ // This bitset tracks which partitions have been computed. We don't
have to worry about
+ // recomputations or speculative tasks because the DAGScheduler
ensures that our result handler
+ // will only be called once per partition.
+ val partitionStatuses = new java.util.BitSet(totalPartitions)
+ // We'll use the convention that a set bit denotes the _absence_ of a
partition's result.
+ // This is done in order to let us call `nextSetBit()` to find the
"frontier" between the prefix
+ // of partitions that have been computed and the first outstanding
partition.
+ partitionStatuses.set(0, totalPartitions - 1, true)
--- End diff --
Btw there's also nextClearBit().
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]