sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r659191689
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -33,31 +51,107 @@
/** The remaining number of values to read in the current batch */
int valuesToReadInBatch;
- ParquetReadState(int maxDefinitionLevel) {
+ ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong
rowIndexes) {
this.maxDefinitionLevel = maxDefinitionLevel;
+ this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+ nextRange();
}
/**
- * Called at the beginning of reading a new batch.
+ * Construct a list of row ranges from the given `rowIndexes`. For example,
suppose the
+ * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3
row ranges:
+ * `[0-2], [4-5], [7-9]`.
*/
- void resetForBatch(int batchSize) {
+ private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong
rowIndexes) {
+ List<RowRange> rowRanges = new ArrayList<>();
+ long currentStart = Long.MIN_VALUE;
+ long previous = Long.MIN_VALUE;
+
+ while (rowIndexes.hasNext()) {
+ long idx = rowIndexes.nextLong();
+ if (previous == Long.MIN_VALUE) {
+ currentStart = previous = idx;
+ } else if (previous + 1 != idx) {
+ RowRange range = new RowRange(currentStart, previous);
+ rowRanges.add(range);
+ currentStart = previous = idx;
+ } else {
+ previous = idx;
+ }
Review comment:
Yeah good point. Will change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]