sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r657290651
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -170,77 +170,135 @@ public int readInteger() {
* }
*/
public void readBatch(
- int total,
- int offset,
+ ParquetReadState state,
WritableColumnVector values,
- int maxDefinitionLevel,
VectorizedValuesReader valueReader,
ParquetVectorUpdater updater) throws IOException {
- int left = total;
- while (left > 0) {
+ int offset = state.offset;
+ long rowId = state.rowId;
+
+ while (state.hasMoreInPage(offset, rowId)) {
if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(left, this.currentCount);
- switch (mode) {
- case RLE:
- if (currentValue == maxDefinitionLevel) {
- updater.updateBatch(n, offset, values, valueReader);
- } else {
- values.putNulls(offset, n);
- }
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == maxDefinitionLevel) {
- updater.update(offset + i, values, valueReader);
+ int n = Math.min(state.valuesToReadInBatch + state.offset - offset,
this.currentCount);
Review comment:
because `state.valuesToReadInBatch` is the number of values left in the
batch. Suppose the initial batch size is 1000, and `valuesToReadInBatch` is
400. This means we've read 600 values and so `state.offset` and `offset` are
both 600 at the beginning. We'll need to use `offset - state.offset` to get how
many values we've read so far.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]