agrawaldevesh commented on a change in pull request #34659: URL: https://github.com/apache/spark/pull/34659#discussion_r757856767
########## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ########## @@ -227,30 +230,340 @@ private void readBatchInternal( switch (mode) { case RLE: if (currentValue == state.maxDefinitionLevel) { - updater.readValues(n, offset, values, valueReader); - } else { - nulls.putNulls(offset, n); + updater.readValues(n, state.valueOffset, values, valueReader); + state.valueOffset += n; + } else if (!state.isRequired && currentValue == state.maxDefinitionLevel - 1) { + // only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNulls(state.valueOffset, n); + state.valueOffset += n; } + defLevels.putInts(state.levelOffset, n, currentValue); break; case PACKED: for (int i = 0; i < n; ++i) { - if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.readValue(offset + i, values, valueReader); - } else { - nulls.putNull(offset + i); + int v = currentBuffer[currentBufferIdx++]; + if (v == state.maxDefinitionLevel) { + updater.readValue(state.valueOffset++, values, valueReader); + } else if (!state.isRequired && v == state.maxDefinitionLevel - 1) { + // only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNull(state.valueOffset++); } + defLevels.putInt(state.levelOffset + i, v); } break; } - offset += n; + state.levelOffset += n; leftInBatch -= n; rowId += n; leftInPage -= n; currentCount -= n; + defLevels.addElementsAppended(n); + } + } + + state.rowsToReadInBatch = leftInBatch; + state.valuesToReadInPage = leftInPage; + state.rowId = rowId; + } + + public void readBatchNested( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, values, values, true, + valueReader, updater); + } + + public void readIntegersNested( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + VectorizedValuesReader valueReader) { + readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, values, nulls, false, + valueReader, new ParquetVectorUpdaterFactory.IntegerUpdater()); + } + + /** + * Keep reading repetition level values from the page until either: 1) we've read enough + * top-level rows to fill the current batch, or 2) we've drained the data page completely. + * + * @param valuesReused whether 'values' vector is reused for 'nulls' + */ + public void readBatchNestedInternal( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + boolean valuesReused, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + + int leftInBatch = state.rowsToReadInBatch; + int leftInPage = state.valuesToReadInPage; + long rowId = state.rowId; + + DefLevelProcessor defLevelProcessor = new DefLevelProcessor(defLevelsReader, state, defLevels, + values, nulls, valuesReused, valueReader, updater); + + while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) { + if (currentCount == 0 && !readNextGroup()) break; + + // values to read in the current RLE/PACKED block, must be <= what's left in the page + int valuesLeftInBlock = Math.min(leftInPage, currentCount); + + // the current row range start and end + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + switch (mode) { + case RLE: + // this RLE block is consist of top-level rows, so we'll need to check + // if the rows should be skipped according to row indexes. + if (currentValue == 0) { + if (leftInBatch == 0) { + state.lastListCompleted = true; + } else { + // # of rows to read in the block, must be <= what's left in the current batch + int n = Math.min(leftInBatch, valuesLeftInBlock); + + if (rowId + n < rangeStart) { + // need to skip all rows in [rowId, rowId + n) + defLevelProcessor.skipValues(n); + rowId += n; + currentCount -= n; + leftInPage -= n; Review comment: should leftInBatch be updated here as well ? ########## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ########## @@ -227,30 +230,340 @@ private void readBatchInternal( switch (mode) { case RLE: if (currentValue == state.maxDefinitionLevel) { - updater.readValues(n, offset, values, valueReader); - } else { - nulls.putNulls(offset, n); + updater.readValues(n, state.valueOffset, values, valueReader); + state.valueOffset += n; + } else if (!state.isRequired && currentValue == state.maxDefinitionLevel - 1) { + // only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNulls(state.valueOffset, n); + state.valueOffset += n; } + defLevels.putInts(state.levelOffset, n, currentValue); break; case PACKED: for (int i = 0; i < n; ++i) { - if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.readValue(offset + i, values, valueReader); - } else { - nulls.putNull(offset + i); + int v = currentBuffer[currentBufferIdx++]; + if (v == state.maxDefinitionLevel) { + updater.readValue(state.valueOffset++, values, valueReader); + } else if (!state.isRequired && v == state.maxDefinitionLevel - 1) { + // only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNull(state.valueOffset++); } + defLevels.putInt(state.levelOffset + i, v); } break; } - offset += n; + state.levelOffset += n; leftInBatch -= n; rowId += n; leftInPage -= n; currentCount -= n; + defLevels.addElementsAppended(n); + } + } + + state.rowsToReadInBatch = leftInBatch; + state.valuesToReadInPage = leftInPage; + state.rowId = rowId; + } + + public void readBatchNested( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, values, values, true, + valueReader, updater); + } + + public void readIntegersNested( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + VectorizedValuesReader valueReader) { + readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, values, nulls, false, + valueReader, new ParquetVectorUpdaterFactory.IntegerUpdater()); + } + + /** + * Keep reading repetition level values from the page until either: 1) we've read enough + * top-level rows to fill the current batch, or 2) we've drained the data page completely. + * + * @param valuesReused whether 'values' vector is reused for 'nulls' + */ + public void readBatchNestedInternal( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + boolean valuesReused, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + + int leftInBatch = state.rowsToReadInBatch; + int leftInPage = state.valuesToReadInPage; + long rowId = state.rowId; + + DefLevelProcessor defLevelProcessor = new DefLevelProcessor(defLevelsReader, state, defLevels, + values, nulls, valuesReused, valueReader, updater); + + while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) { + if (currentCount == 0 && !readNextGroup()) break; + + // values to read in the current RLE/PACKED block, must be <= what's left in the page + int valuesLeftInBlock = Math.min(leftInPage, currentCount); + + // the current row range start and end + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + switch (mode) { + case RLE: + // this RLE block is consist of top-level rows, so we'll need to check + // if the rows should be skipped according to row indexes. + if (currentValue == 0) { + if (leftInBatch == 0) { + state.lastListCompleted = true; + } else { + // # of rows to read in the block, must be <= what's left in the current batch + int n = Math.min(leftInBatch, valuesLeftInBlock); + + if (rowId + n < rangeStart) { + // need to skip all rows in [rowId, rowId + n) + defLevelProcessor.skipValues(n); + rowId += n; + currentCount -= n; + leftInPage -= n; + } else if (rowId > rangeEnd) { + // the current row index already beyond the current range: move to the next range + // and repeat + state.nextRange(); + } else { + // the range [rowId, rowId + n) overlaps with the current row range + long start = Math.max(rangeStart, rowId); + long end = Math.min(rangeEnd, rowId + n - 1); + + // skip the rows in [rowId, start) + int toSkip = (int) (start - rowId); + if (toSkip > 0) { + defLevelProcessor.skipValues(toSkip); + rowId += toSkip; + currentCount -= toSkip; + leftInPage -= toSkip; + } + + // read the rows in [start, end] + n = (int) (end - start + 1); + + leftInBatch -= n; + if (n > 0) { + repLevels.appendInts(n, 0); + defLevelProcessor.readValues(n); + } + + rowId += n; Review comment: lets move leftInBatch update here. I am not sure why some of these are local variables vs member vars :-) ########## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ########## @@ -227,30 +230,340 @@ private void readBatchInternal( switch (mode) { case RLE: if (currentValue == state.maxDefinitionLevel) { - updater.readValues(n, offset, values, valueReader); - } else { - nulls.putNulls(offset, n); + updater.readValues(n, state.valueOffset, values, valueReader); + state.valueOffset += n; + } else if (!state.isRequired && currentValue == state.maxDefinitionLevel - 1) { + // only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNulls(state.valueOffset, n); + state.valueOffset += n; } + defLevels.putInts(state.levelOffset, n, currentValue); break; case PACKED: for (int i = 0; i < n; ++i) { - if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.readValue(offset + i, values, valueReader); - } else { - nulls.putNull(offset + i); + int v = currentBuffer[currentBufferIdx++]; + if (v == state.maxDefinitionLevel) { + updater.readValue(state.valueOffset++, values, valueReader); + } else if (!state.isRequired && v == state.maxDefinitionLevel - 1) { + // only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNull(state.valueOffset++); } + defLevels.putInt(state.levelOffset + i, v); } break; } - offset += n; + state.levelOffset += n; leftInBatch -= n; rowId += n; leftInPage -= n; currentCount -= n; + defLevels.addElementsAppended(n); + } + } + + state.rowsToReadInBatch = leftInBatch; + state.valuesToReadInPage = leftInPage; + state.rowId = rowId; + } + + public void readBatchNested( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, values, values, true, + valueReader, updater); + } + + public void readIntegersNested( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + VectorizedValuesReader valueReader) { + readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, values, nulls, false, + valueReader, new ParquetVectorUpdaterFactory.IntegerUpdater()); + } + + /** + * Keep reading repetition level values from the page until either: 1) we've read enough + * top-level rows to fill the current batch, or 2) we've drained the data page completely. + * + * @param valuesReused whether 'values' vector is reused for 'nulls' + */ + public void readBatchNestedInternal( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + boolean valuesReused, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + + int leftInBatch = state.rowsToReadInBatch; + int leftInPage = state.valuesToReadInPage; + long rowId = state.rowId; + + DefLevelProcessor defLevelProcessor = new DefLevelProcessor(defLevelsReader, state, defLevels, + values, nulls, valuesReused, valueReader, updater); + + while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) { + if (currentCount == 0 && !readNextGroup()) break; Review comment: The original non nested code refers to currentCount as this.currentCount in certain places. Should we pick that style as well ? ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ########## @@ -897,6 +897,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED = + buildConf("spark.sql.parquet.enableNestedColumnVectorizedReader") + .doc("Enables vectorized Parquet decoding for nested columns (e.g., struct, list, map). " + + s"Note to enable this ${PARQUET_VECTORIZED_READER_ENABLED.key} also needs to be enabled.") + .version("3.3.0") + .booleanConf + .createWithDefault(true) Review comment: I am liking the confidence :-) ########## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ########## @@ -227,30 +230,340 @@ private void readBatchInternal( switch (mode) { case RLE: if (currentValue == state.maxDefinitionLevel) { - updater.readValues(n, offset, values, valueReader); - } else { - nulls.putNulls(offset, n); + updater.readValues(n, state.valueOffset, values, valueReader); + state.valueOffset += n; + } else if (!state.isRequired && currentValue == state.maxDefinitionLevel - 1) { + // only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNulls(state.valueOffset, n); + state.valueOffset += n; } + defLevels.putInts(state.levelOffset, n, currentValue); break; case PACKED: for (int i = 0; i < n; ++i) { - if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.readValue(offset + i, values, valueReader); - } else { - nulls.putNull(offset + i); + int v = currentBuffer[currentBufferIdx++]; Review comment: v -> value ? ########## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ########## @@ -227,30 +230,340 @@ private void readBatchInternal( switch (mode) { case RLE: if (currentValue == state.maxDefinitionLevel) { - updater.readValues(n, offset, values, valueReader); - } else { - nulls.putNulls(offset, n); + updater.readValues(n, state.valueOffset, values, valueReader); + state.valueOffset += n; + } else if (!state.isRequired && currentValue == state.maxDefinitionLevel - 1) { + // only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNulls(state.valueOffset, n); + state.valueOffset += n; } + defLevels.putInts(state.levelOffset, n, currentValue); break; case PACKED: for (int i = 0; i < n; ++i) { - if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.readValue(offset + i, values, valueReader); - } else { - nulls.putNull(offset + i); + int v = currentBuffer[currentBufferIdx++]; + if (v == state.maxDefinitionLevel) { + updater.readValue(state.valueOffset++, values, valueReader); + } else if (!state.isRequired && v == state.maxDefinitionLevel - 1) { + // only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNull(state.valueOffset++); } + defLevels.putInt(state.levelOffset + i, v); } break; } - offset += n; + state.levelOffset += n; leftInBatch -= n; rowId += n; leftInPage -= n; currentCount -= n; + defLevels.addElementsAppended(n); + } + } + + state.rowsToReadInBatch = leftInBatch; + state.valuesToReadInPage = leftInPage; + state.rowId = rowId; + } + + public void readBatchNested( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, values, values, true, + valueReader, updater); + } + + public void readIntegersNested( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + VectorizedValuesReader valueReader) { + readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, values, nulls, false, + valueReader, new ParquetVectorUpdaterFactory.IntegerUpdater()); + } + + /** + * Keep reading repetition level values from the page until either: 1) we've read enough + * top-level rows to fill the current batch, or 2) we've drained the data page completely. + * + * @param valuesReused whether 'values' vector is reused for 'nulls' + */ + public void readBatchNestedInternal( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + boolean valuesReused, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + + int leftInBatch = state.rowsToReadInBatch; + int leftInPage = state.valuesToReadInPage; + long rowId = state.rowId; + + DefLevelProcessor defLevelProcessor = new DefLevelProcessor(defLevelsReader, state, defLevels, + values, nulls, valuesReused, valueReader, updater); + + while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) { + if (currentCount == 0 && !readNextGroup()) break; + + // values to read in the current RLE/PACKED block, must be <= what's left in the page + int valuesLeftInBlock = Math.min(leftInPage, currentCount); + + // the current row range start and end + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + switch (mode) { + case RLE: + // this RLE block is consist of top-level rows, so we'll need to check + // if the rows should be skipped according to row indexes. + if (currentValue == 0) { + if (leftInBatch == 0) { + state.lastListCompleted = true; + } else { + // # of rows to read in the block, must be <= what's left in the current batch + int n = Math.min(leftInBatch, valuesLeftInBlock); + + if (rowId + n < rangeStart) { + // need to skip all rows in [rowId, rowId + n) + defLevelProcessor.skipValues(n); + rowId += n; + currentCount -= n; + leftInPage -= n; + } else if (rowId > rangeEnd) { + // the current row index already beyond the current range: move to the next range + // and repeat + state.nextRange(); + } else { + // the range [rowId, rowId + n) overlaps with the current row range + long start = Math.max(rangeStart, rowId); + long end = Math.min(rangeEnd, rowId + n - 1); + + // skip the rows in [rowId, start) + int toSkip = (int) (start - rowId); + if (toSkip > 0) { + defLevelProcessor.skipValues(toSkip); + rowId += toSkip; + currentCount -= toSkip; + leftInPage -= toSkip; + } + + // read the rows in [start, end] + n = (int) (end - start + 1); + + leftInBatch -= n; + if (n > 0) { + repLevels.appendInts(n, 0); Review comment: Not sure, but should repLevels be immutable for the purpose of this function ? ########## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ########## @@ -227,30 +230,340 @@ private void readBatchInternal( switch (mode) { case RLE: Review comment: I am not too familiar with this code, but where do we assert that these are the only two encoding choices. There is no default arm in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org