dongjoon-hyun commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r657655917
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -33,31 +51,102 @@
/** The remaining number of values to read in the current batch */
int valuesToReadInBatch;
- ParquetReadState(int maxDefinitionLevel) {
+ ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong
rowIndexes) {
this.maxDefinitionLevel = maxDefinitionLevel;
+ this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+ nextRange();
+ }
+
+ private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong
rowIndexes) {
+ List<RowRange> rowRanges = new ArrayList<>();
+ long currentStart, previous;
+ currentStart = previous = Long.MIN_VALUE;
+
+ while (rowIndexes.hasNext()) {
+ long idx = rowIndexes.nextLong();
+ if (previous == Long.MIN_VALUE) {
+ currentStart = previous = idx;
+ } else if (previous + 1 != idx) {
+ RowRange range = new RowRange(currentStart, previous);
+ rowRanges.add(range);
+ currentStart = previous = idx;
+ } else {
+ previous = idx;
+ }
+ }
+
+ if (previous != Long.MIN_VALUE) {
+ rowRanges.add(new RowRange(currentStart, previous));
+ }
+
+ return rowRanges.iterator();
}
/**
- * Called at the beginning of reading a new batch.
+ * Must be called at the beginning of reading a new batch.
*/
- void resetForBatch(int batchSize) {
+ void resetForNewBatch(int batchSize) {
Review comment:
Ya, new names (resetForNewBatch/resetForNewPage) look clear and better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]