HyukjinKwon commented on a change in pull request #26809: [SPARK-30185][SQL]
Implement Dataset.tail API
URL: https://github.com/apache/spark/pull/26809#discussion_r357003666
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
##########
@@ -426,23 +456,46 @@ abstract class SparkPlan extends QueryPlan[SparkPlan]
with Logging with Serializ
}
}
- val p = partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts).toInt)
+ val parts = partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts).toInt)
+ val partsToScan = if (reverse) {
+ // Reverse partitions to scan. So, if parts was [1, 2, 3] in 200
partitions (0 to 199),
+ // it becomes [198, 197, 196].
+ parts.map(p => (totalParts - 1) - p)
+ } else {
+ parts
+ }
val sc = sqlContext.sparkContext
val res = sc.runJob(childRDD, (it: Iterator[(Long, Array[Byte])]) =>
- if (it.hasNext) it.next() else (0L, Array.empty[Byte]), p)
+ if (it.hasNext) it.next() else (0L, Array.empty[Byte]), partsToScan)
var i = 0
- while (buf.length < n && i < res.length) {
- val rows = decodeUnsafeRows(res(i)._2)
- val rowsToTake = if (n - buf.length >= res(i)._1) {
- rows.toArray
- } else {
- rows.take(n - buf.length).toArray
+
+ if (reverse) {
+ while (buf.length < n && i < res.length) {
+ val rows = decodeUnsafeRows(res(i)._2)
+ if (n - buf.length >= res(i)._1) {
+ buf.insertAll(0, rows.toArray[InternalRow])
Review comment:
Hm, sorry if I misunderstood you guys. This is what I am trying to do here:
<img
src="https://user-images.githubusercontent.com/6477701/70691881-86acda00-1cfd-11ea-94e9-c98146618fc5.png"
width="350" height="400" />
Let's say if we execute `tail` half, the current PR will collect partitions
from the end (because otherwise we don't know how many rows to take from where).
1. It will collect data from partition 5 first.
<img
src="https://user-images.githubusercontent.com/6477701/70693447-d5a83e80-1d00-11ea-84d8-1144f3dd2d09.png"
width="350" height="100"/>
2. We will collect data from partition 4 and then partition 3 (let's say as
`spark.sql.limit.scaleUpFactor` was 2).
3. When we read partition 3, if the count is matched to `n`, we should stop
reading finally here.
<img
src="https://user-images.githubusercontent.com/6477701/70693464-d80a9880-1d00-11ea-9a95-6300ba198827.png"
width="350" height="200"/>
Since we should execute the same from the last sequentially. So, I think
prepending is inevitable (if we avoid to create another array by concatenation
or other extra operations).
Let me just change it to `ListBuffer` and prepend per
https://docs.scala-lang.org/overviews/collections/performance-characteristics.html
. Seems it's a minimised change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]