LuciferYang commented on a change in pull request #34659:
URL: https://github.com/apache/spark/pull/34659#discussion_r840675154
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -210,48 +215,379 @@ private void readBatchInternal(
} else if (rowId > rangeEnd) {
state.nextRange();
} else {
- // the range [rowId, rowId + n) overlaps with the current row range in
state
+ // The range [rowId, rowId + n) overlaps with the current row range in
state
long start = Math.max(rangeStart, rowId);
long end = Math.min(rangeEnd, rowId + n - 1);
- // skip the part [rowId, start)
+ // Skip the part [rowId, start)
int toSkip = (int) (start - rowId);
if (toSkip > 0) {
skipValues(toSkip, state, valueReader, updater);
rowId += toSkip;
leftInPage -= toSkip;
}
- // read the part [start, end]
+ // Read the part [start, end]
n = (int) (end - start + 1);
switch (mode) {
case RLE:
if (currentValue == state.maxDefinitionLevel) {
- updater.readValues(n, offset, values, valueReader);
- } else {
- nulls.putNulls(offset, n);
+ updater.readValues(n, state.valueOffset, values, valueReader);
+ state.valueOffset += n;
+ } else if (!state.isRequired && currentValue ==
state.maxDefinitionLevel - 1) {
+ // Only add null if this represents a null element, but not for
the case where a
+ // struct itself is null
+ nulls.putNulls(state.valueOffset, n);
+ state.valueOffset += n;
}
+ defLevels.putInts(state.levelOffset, n, currentValue);
break;
case PACKED:
for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] ==
state.maxDefinitionLevel) {
- updater.readValue(offset + i, values, valueReader);
+ int value = currentBuffer[currentBufferIdx++];
+ if (value == state.maxDefinitionLevel) {
+ updater.readValue(state.valueOffset++, values, valueReader);
} else {
- nulls.putNull(offset + i);
+ // Only add null if this represents a null element, but not
for the case where a
+ // struct itself is null
+ nulls.putNull(state.valueOffset++);
}
+ defLevels.putInt(state.levelOffset + i, value);
}
break;
}
- offset += n;
+ state.levelOffset += n;
leftInBatch -= n;
rowId += n;
leftInPage -= n;
currentCount -= n;
+ defLevels.addElementsAppended(n);
}
}
- state.advanceOffsetAndRowId(offset, rowId);
+ state.rowsToReadInBatch = leftInBatch;
+ state.valuesToReadInPage = leftInPage;
+ state.rowId = rowId;
+ }
+
+ /**
+ * Reads a batch of repetition levels, definition levels and values into
'repLevels',
+ * 'defLevels' and 'values' respectively. The definition levels and values
are read via
+ * 'defLevelsReader' and 'valueReader' respectively.
+ * <p>
+ * The related states such as row index, offset, number of rows left in the
batch and page,
+ * are tracked by 'state'. The type-specific 'updater' is used to update or
skip values.
+ */
+ public void readBatchRepeated(
+ ParquetReadState state,
+ WritableColumnVector repLevels,
+ VectorizedRleValuesReader defLevelsReader,
+ WritableColumnVector defLevels,
+ WritableColumnVector values,
+ VectorizedValuesReader valueReader,
+ ParquetVectorUpdater updater) {
+ readBatchRepeatedInternal(state, repLevels, defLevelsReader, defLevels,
values, values, true,
+ valueReader, updater);
+ }
+
+ /**
+ * Reads a batch of repetition levels, definition levels and integer values
into 'repLevels',
+ * 'defLevels', 'values' and 'nulls' respectively. The definition levels and
values are read via
+ * 'defLevelsReader' and 'valueReader' respectively.
+ * <p>
+ * The 'values' vector is used to hold non-null values, while 'nulls' vector
is used to hold
+ * null values.
+ * <p>
+ * The related states such as row index, offset, number of rows left in the
batch and page,
+ * are tracked by 'state'.
+ * <p>
+ * Unlike 'readBatchRepeated', this is used to decode dictionary indices in
dictionary encoding.
+ */
+ public void readIntegersRepeated(
+ ParquetReadState state,
+ WritableColumnVector repLevels,
+ VectorizedRleValuesReader defLevelsReader,
+ WritableColumnVector defLevels,
+ WritableColumnVector values,
+ WritableColumnVector nulls,
+ VectorizedValuesReader valueReader) {
+ readBatchRepeatedInternal(state, repLevels, defLevelsReader, defLevels,
values, nulls, false,
+ valueReader, new ParquetVectorUpdaterFactory.IntegerUpdater());
+ }
+
+ /**
+ * Keep reading repetition level values from the page until either: 1) we've
read enough
+ * top-level rows to fill the current batch, or 2) we've drained the data
page completely.
+ *
+ * @param valuesReused whether 'values' vector is reused for 'nulls'
+ */
+ public void readBatchRepeatedInternal(
+ ParquetReadState state,
+ WritableColumnVector repLevels,
+ VectorizedRleValuesReader defLevelsReader,
+ WritableColumnVector defLevels,
+ WritableColumnVector values,
+ WritableColumnVector nulls,
+ boolean valuesReused,
+ VectorizedValuesReader valueReader,
+ ParquetVectorUpdater updater) {
+
+ int leftInBatch = state.rowsToReadInBatch;
+ int leftInPage = state.valuesToReadInPage;
+ long rowId = state.rowId;
+
+ DefLevelProcessor defLevelProcessor = new
DefLevelProcessor(defLevelsReader, state, defLevels,
+ values, nulls, valuesReused, valueReader, updater);
+
+ while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) {
+ if (currentCount == 0 && !readNextGroup()) break;
+
+ // Values to read in the current RLE/PACKED block, must be <= what's
left in the page
+ int valuesLeftInBlock = Math.min(leftInPage, currentCount);
+
+ // The current row range start and end
+ long rangeStart = state.currentRangeStart();
+ long rangeEnd = state.currentRangeEnd();
+
+ switch (mode) {
+ case RLE:
+ // This RLE block is consist of top-level rows, so we'll need to
check
+ // if the rows should be skipped according to row indexes.
+ if (currentValue == 0) {
+ if (leftInBatch == 0) {
+ state.lastListCompleted = true;
+ } else {
+ // # of rows to read in the block, must be <= what's left in the
current batch
+ int n = Math.min(leftInBatch, valuesLeftInBlock);
+
+ if (rowId + n < rangeStart) {
+ // Need to skip all rows in [rowId, rowId + n)
+ defLevelProcessor.skipValues(n);
+ rowId += n;
+ currentCount -= n;
+ leftInPage -= n;
+ } else if (rowId > rangeEnd) {
+ // The current row index already beyond the current range:
move to the next range
+ // and repeat
+ state.nextRange();
+ } else {
+ // The range [rowId, rowId + n) overlaps with the current row
range
+ long start = Math.max(rangeStart, rowId);
+ long end = Math.min(rangeEnd, rowId + n - 1);
+
+ // Skip the rows in [rowId, start)
+ int toSkip = (int) (start - rowId);
+ if (toSkip > 0) {
+ defLevelProcessor.skipValues(toSkip);
+ rowId += toSkip;
+ currentCount -= toSkip;
+ leftInPage -= toSkip;
+ }
+
+ // Read the rows in [start, end]
+ n = (int) (end - start + 1);
+
+ if (n > 0) {
+ repLevels.appendInts(n, 0);
+ defLevelProcessor.readValues(n);
+ }
+
+ rowId += n;
+ currentCount -= n;
+ leftInBatch -= n;
+ leftInPage -= n;
+ }
+ }
+ } else {
+ // Not a top-level row: just read all the repetition levels in the
block if the row
+ // should be included according to row indexes, else skip the rows.
+ if (!state.shouldSkip) {
+ repLevels.appendInts(valuesLeftInBlock, currentValue);
+ }
+ state.numBatchedDefLevels += valuesLeftInBlock;
+ leftInPage -= valuesLeftInBlock;
+ currentCount -= valuesLeftInBlock;
+ }
+ break;
+ case PACKED:
+ int i = 0;
+
+ for (; i < valuesLeftInBlock; i++) {
+ int currentValue = currentBuffer[currentBufferIdx + i];
+ if (currentValue == 0) {
+ if (leftInBatch == 0) {
+ state.lastListCompleted = true;
+ break;
+ } else if (rowId < rangeStart) {
+ // This is a top-level row, therefore check if we should skip
it with row indexes
+ // the row is before the current range, skip it
+ defLevelProcessor.skipValues(1);
+ } else if (rowId > rangeEnd) {
+ // The row is after the current range, move to the next range
and compare again
+ state.nextRange();
+ break;
+ } else {
+ // The row is in the current range, decrement the row counter
and read it
+ leftInBatch--;
+ repLevels.appendInt(0);
+ defLevelProcessor.readValues(1);
+ }
+ rowId++;
+ } else {
+ if (!state.shouldSkip) {
+ repLevels.appendInt(currentValue);
+ }
+ state.numBatchedDefLevels += 1;
+ }
+ }
+
+ leftInPage -= i;
+ currentCount -= i;
+ currentBufferIdx += i;
+ break;
+ }
+ }
+
+ // Process all the batched def levels
+ defLevelProcessor.finish();
+
+ state.rowsToReadInBatch = leftInBatch;
+ state.valuesToReadInPage = leftInPage;
+ state.rowId = rowId;
+ }
+
+ private static class DefLevelProcessor {
+ private final VectorizedRleValuesReader reader;
+ private final ParquetReadState state;
+ private final WritableColumnVector defLevels;
+ private final WritableColumnVector values;
+ private final WritableColumnVector nulls;
+ private final boolean valuesReused;
+ private final VectorizedValuesReader valueReader;
+ private final ParquetVectorUpdater updater;
+
+ DefLevelProcessor(
+ VectorizedRleValuesReader reader,
+ ParquetReadState state,
+ WritableColumnVector defLevels,
+ WritableColumnVector values,
+ WritableColumnVector nulls,
+ boolean valuesReused,
+ VectorizedValuesReader valueReader,
+ ParquetVectorUpdater updater) {
+ this.reader = reader;
+ this.state = state;
+ this.defLevels = defLevels;
+ this.values = values;
+ this.nulls = nulls;
+ this.valuesReused = valuesReused;
+ this.valueReader = valueReader;
+ this.updater = updater;
+ }
+
+ void readValues(int n) {
+ if (!state.shouldSkip) {
+ state.numBatchedDefLevels += n;
+ } else {
+ reader.skipValues(state.numBatchedDefLevels, state, valueReader,
updater);
+ state.numBatchedDefLevels = n;
+ state.shouldSkip = false;
+ }
+ }
+
+ void skipValues(int n) {
+ if (state.shouldSkip) {
+ state.numBatchedDefLevels += n;
+ } else {
+ reader.readValues(state.numBatchedDefLevels, state, defLevels, values,
nulls, valuesReused,
+ valueReader, updater);
+ state.numBatchedDefLevels = n;
+ state.shouldSkip = true;
+ }
+ }
+
+ void finish() {
+ if (state.numBatchedDefLevels > 0) {
+ if (state.shouldSkip) {
+ reader.skipValues(state.numBatchedDefLevels, state, valueReader,
updater);
+ } else {
+ reader.readValues(state.numBatchedDefLevels, state, defLevels,
values, nulls,
+ valuesReused, valueReader, updater);
+ }
+ state.numBatchedDefLevels = 0;
+ }
+ }
+ }
+
+ /**
+ * Read the next 'total' values (either null or non-null) from this
definition level reader and
+ * 'valueReader'. The definition levels are read into 'defLevels'. If a
value is not
+ * null, it is appended to 'values'. Otherwise, a null bit will be set in
'nulls'.
+ *
+ * This is only used when reading repeated values.
+ */
+ private void readValues(
+ int total,
+ ParquetReadState state,
+ WritableColumnVector defLevels,
+ WritableColumnVector values,
+ WritableColumnVector nulls,
+ boolean valuesReused,
+ VectorizedValuesReader valueReader,
+ ParquetVectorUpdater updater) {
+
+ defLevels.reserveAdditional(total);
+ values.reserveAdditional(total);
+ if (!valuesReused) {
+ // 'nulls' is a separate column vector so we'll have to reserve it
separately
+ nulls.reserveAdditional(total);
+ }
+
+ int n = total;
+ int initialValueOffset = state.valueOffset;
+ while (n > 0) {
+ if (currentCount == 0 && !readNextGroup()) break;
+ int num = Math.min(n, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == state.maxDefinitionLevel) {
+ updater.readValues(num, state.valueOffset, values, valueReader);
+ state.valueOffset += num;
+ } else {
+ // Only add null if this represents a null element, but not the
case when a
+ // collection is null or empty.
+ nulls.putNulls(state.valueOffset, num);
+ state.valueOffset += num;
+ }
+ defLevels.putInts(state.levelOffset, num, currentValue);
+ break;
+ case PACKED:
Review comment:
line 565 ~ 575 looks same as line 247 ~ 257 except for the comments
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]