Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/14573#discussion_r76227554
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -311,30 +311,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan]
with Logging with Serializ
val buf = new ArrayBuffer[InternalRow]
val totalParts = childRDD.partitions.length
var partsScanned = 0
+
while (buf.size < n && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for
this number to be
// greater than totalParts because we actually cap it at totalParts
in runJob.
var numPartsToTry = 1L
if (partsScanned > 0) {
- // If we didn't find any rows after the first iteration, just try
all partitions next.
- // Otherwise, interpolate the number of partitions we need to try,
but overestimate it
- // by 50%.
- if (buf.size == 0) {
- numPartsToTry = totalParts - 1
+ // If we didn't find any rows after the previous iteration,
quadruple and retry.
+ // Otherwise, interpolate the number of partitions we need to try,
but overestimate
+ // it by 50%. We also cap the estimation in the end.
+ val takeRampUpRate = sqlContext.conf.takeRampUpRate
+ if (buf.isEmpty) {
+ numPartsToTry = partsScanned * takeRampUpRate
} else {
- numPartsToTry = (1.5 * n * partsScanned / buf.size).toInt
+ // the left side of max is >=1 whenever partsScanned >= 2
+ numPartsToTry = Math.max((1.5 * n * partsScanned /
buf.size).toInt - partsScanned, 1)
+ numPartsToTry = Math.min(numPartsToTry, partsScanned *
takeRampUpRate)
}
}
- numPartsToTry = math.max(0, numPartsToTry) // guard against
negative num of partitions
val left = n - buf.size
val p = partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts).toInt)
val sc = sqlContext.sparkContext
- val res = sc.runJob(childRDD,
- (it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else
Array.empty, p)
+ val res = sc.runJob(childRDD, (it: Iterator[Array[Byte]]) =>
it.take(left).toArray, p)
--- End diff --
Why change this? The childRDD is an `RDD[Array[Byte]]` it returns an
iterator with exactly one element (a byte array containing the entire array).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]