cloud-fan commented on a change in pull request #33006:
URL: https://github.com/apache/spark/pull/33006#discussion_r655896169
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -174,24 +162,29 @@ void readBatch(int total, WritableColumnVector column)
throws IOException {
// page.
dictionaryIds = column.reserveDictionaryIds(total);
}
- while (total > 0) {
+ readState.resetForBatch(total);
+ while (readState.valuesToReadInBatch > 0) {
// Compute the number of values we want to read in this page.
- int leftInPage = (int) (endOfPageValueCount - valuesRead);
- if (leftInPage == 0) {
+ if (readState.valuesToReadInPage == 0) {
readPage();
- leftInPage = (int) (endOfPageValueCount - valuesRead);
+ readState.resetForPage(pageValueCount);
}
- int num = Math.min(total, leftInPage);
PrimitiveType.PrimitiveTypeName typeName =
descriptor.getPrimitiveType().getPrimitiveTypeName();
if (isCurrentPageDictionaryEncoded) {
+ boolean supportLazyDecoding = readState.offset == 0 &&
+ isLazyDecodingSupported(typeName);
+
+ // Save starting offset in case we need to decode dictionary IDs.
+ int startOffset = readState.offset;
+
// Read and decode dictionary ids.
- defColumn.readIntegers(
- num, dictionaryIds, column, rowId, maxDefLevel,
(VectorizedValuesReader) dataColumn);
+ defColumn.readIntegers(readState, dictionaryIds, column,
+ (VectorizedValuesReader) dataColumn);
// TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we
need to post process
// the values to add microseconds precision.
- if (column.hasDictionary() || (rowId == 0 &&
isLazyDecodingSupported(typeName))) {
+ if (column.hasDictionary() || supportLazyDecoding) {
Review comment:
looks OK, it's just a few CPU cycles.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -174,24 +162,29 @@ void readBatch(int total, WritableColumnVector column)
throws IOException {
// page.
dictionaryIds = column.reserveDictionaryIds(total);
}
- while (total > 0) {
+ readState.resetForBatch(total);
+ while (readState.valuesToReadInBatch > 0) {
// Compute the number of values we want to read in this page.
- int leftInPage = (int) (endOfPageValueCount - valuesRead);
- if (leftInPage == 0) {
+ if (readState.valuesToReadInPage == 0) {
readPage();
Review comment:
Can we let `readPage` to return the value count? then we don't need the
state `pageValueCount`.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -199,48 +200,51 @@ public void readBatch(
break;
}
offset += n;
- left -= n;
currentCount -= n;
}
+
+ state.advanceOffset(offset);
}
/**
* Decoding for dictionary ids. The IDs are populated into `values` and the
nullability is
* populated into `nulls`.
*/
public void readIntegers(
- int total,
+ ParquetReadState state,
WritableColumnVector values,
WritableColumnVector nulls,
- int rowId,
- int level,
VectorizedValuesReader data) throws IOException {
- int left = total;
- while (left > 0) {
+ int offset = state.offset;
+
+ while (state.hasMoreInPage(offset)) {
Review comment:
This looks inefficient. Can we follow the old code and calculate the
`left` first, then do `while (left > 0)`?
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -199,48 +200,51 @@ public void readBatch(
break;
}
offset += n;
- left -= n;
currentCount -= n;
}
+
+ state.advanceOffset(offset);
}
/**
* Decoding for dictionary ids. The IDs are populated into `values` and the
nullability is
* populated into `nulls`.
*/
public void readIntegers(
- int total,
+ ParquetReadState state,
WritableColumnVector values,
WritableColumnVector nulls,
- int rowId,
- int level,
VectorizedValuesReader data) throws IOException {
- int left = total;
- while (left > 0) {
+ int offset = state.offset;
+
+ while (state.hasMoreInPage(offset)) {
Review comment:
e.g.
```
int offset = state.offset;
int left = Math.min(state.valuesToReadInPage, state. valuesToReadInBatch)
while (left > 0) {
...
offset += n;
left -= n;
}
```
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -216,53 +195,49 @@ void readBatch(int total, WritableColumnVector column)
throws IOException {
boolean needTransform = castLongToInt || isUnsignedInt32 ||
isUnsignedInt64;
column.setDictionary(new ParquetDictionary(dictionary,
needTransform));
} else {
- updater.decodeDictionaryIds(num, rowId, column, dictionaryIds,
dictionary);
+ updater.decodeDictionaryIds(readState.offset - startOffset,
startOffset, column,
+ dictionaryIds, dictionary);
}
} else {
- if (column.hasDictionary() && rowId != 0) {
+ if (column.hasDictionary() && readState.offset != 0) {
// This batch already has dictionary encoded values but this new
page is not. The batch
// does not support a mix of dictionary and not so we will decode
the dictionary.
- updater.decodeDictionaryIds(rowId, 0, column, dictionaryIds,
dictionary);
+ updater.decodeDictionaryIds(readState.offset, 0, column,
dictionaryIds, dictionary);
}
column.setDictionary(null);
VectorizedValuesReader valuesReader = (VectorizedValuesReader)
dataColumn;
- defColumn.readBatch(num, rowId, column, maxDefLevel, valuesReader,
updater);
+ defColumn.readBatch(readState, column, valuesReader, updater);
}
-
- valuesRead += num;
- rowId += num;
- total -= num;
}
}
- private void readPage() {
+ private int readPage() {
DataPage page = pageReader.readPage();
- // TODO: Why is this a visitor?
- page.accept(new DataPage.Visitor<Void>() {
+ return page.accept(new DataPage.Visitor<Integer>() {
@Override
- public Void visit(DataPageV1 dataPageV1) {
+ public Integer visit(DataPageV1 dataPageV1) {
Review comment:
To avoid boxing, we can assign the valueCount to a local variale and
return it, e.g.
```
int pageValueCount = 0;
page.accept(new DataPage.Visitor<Integer>() {
...
pageValueCount = readPageV1(dataPageV1);
return null;
...
}
return pageValueCount;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]