This is an automated email from the ASF dual-hosted git repository. gabor pushed a commit to branch column-indexes in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/column-indexes by this push: new 43ac3e1 PARQUET-1364: Invalid row indexes for pages starting with nulls (#507) 43ac3e1 is described below commit 43ac3e18aeb10a421657480da9d9bd2a862fdd3c Author: Gabor Szadovszky <ga...@apache.org> AuthorDate: Fri Aug 3 07:39:28 2018 +0200 PARQUET-1364: Invalid row indexes for pages starting with nulls (#507) --- .../apache/parquet/column/impl/ColumnWriteStoreBase.java | 4 ++-- .../org/apache/parquet/column/impl/ColumnWriterBase.java | 15 ++++++++------- .../apache/parquet/column/impl/TestColumnReaderImpl.java | 8 ++++---- .../java/org/apache/parquet/column/mem/TestMemColumn.java | 5 ++++- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java index d04192f..5cd7d87 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java @@ -149,7 +149,7 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore { for (ColumnWriterBase memColumn : columns.values()) { long rows = rowCount - memColumn.getRowsWrittenSoFar(); if (rows > 0) { - memColumn.writePage(rowCount); + memColumn.writePage(); } memColumn.finalizeColumnChunk(); } @@ -195,7 +195,7 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore { long rows = rowCount - writer.getRowsWrittenSoFar(); long remainingMem = props.getPageSizeThreshold() - usedMem; if (remainingMem <= thresholdTolerance) { - writer.writePage(rowCount); + writer.writePage(); remainingMem = props.getPageSizeThreshold(); } long rowsToFillPage = diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index 16085bb..3788c82 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -20,7 +20,6 @@ package org.apache.parquet.column.impl; import java.io.IOException; -import org.apache.parquet.Ints; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriter; import org.apache.parquet.column.ParquetProperties; @@ -52,6 +51,7 @@ abstract class ColumnWriterBase implements ColumnWriter { private Statistics<?> statistics; private long rowsWrittenSoFar = 0; + private int pageRowCount; ColumnWriterBase( ColumnDescriptor path, @@ -84,6 +84,10 @@ abstract class ColumnWriterBase implements ColumnWriter { private void repetitionLevel(int repetitionLevel) { repetitionLevelColumn.writeInteger(repetitionLevel); + assert pageRowCount == 0 ? repetitionLevel == 0 : true : "Every page shall start on record boundaries"; + if (repetitionLevel == 0) { + ++pageRowCount; + } } /** @@ -299,13 +303,9 @@ abstract class ColumnWriterBase implements ColumnWriter { /** * Writes the current data to a new page in the page store - * - * @param rowCount - * how many rows have been written so far */ - void writePage(long rowCount) { - int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar); - this.rowsWrittenSoFar = rowCount; + void writePage() { + this.rowsWrittenSoFar += pageRowCount; if (DEBUG) LOG.debug("write page"); try { @@ -318,6 +318,7 @@ abstract class ColumnWriterBase implements ColumnWriter { dataColumn.reset(); valueCount = 0; resetStatistics(); + pageRowCount = 0; } abstract void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels, diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java index d2d78c4..35fddaf 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java @@ -65,10 +65,10 @@ public class TestColumnReaderImpl { for (int i = 0; i < rows; i++) { columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0); if ((i + 1) % 1000 == 0) { - columnWriterV2.writePage(i); + columnWriterV2.writePage(); } } - columnWriterV2.writePage(rows); + columnWriterV2.writePage(); columnWriterV2.finalizeColumnChunk(); List<DataPage> pages = pageWriter.getPages(); int valueCount = 0; @@ -103,10 +103,10 @@ public class TestColumnReaderImpl { for (int i = 0; i < rows; i++) { columnWriterV2.writeNull(0, 0); if ((i + 1) % 1000 == 0) { - columnWriterV2.writePage(i); + columnWriterV2.writePage(); } } - columnWriterV2.writePage(rows); + columnWriterV2.writePage(); columnWriterV2.finalizeColumnChunk(); List<DataPage> pages = pageWriter.getPages(); int valueCount = 0; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java index c28649e..e5db38c 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java @@ -138,13 +138,16 @@ public class TestMemColumn { int r = rs[i % rs.length]; int d = ds[i % ds.length]; LOG.debug("write i: {}", i); + if (i != 0 && r == 0) { + memColumnsStore.endRecord(); + } if (d == 2) { columnWriter.write((long)i, r, d); } else { columnWriter.writeNull(r, d); } - memColumnsStore.endRecord(); } + memColumnsStore.endRecord(); memColumnsStore.flush(); ColumnReader columnReader = getColumnReader(memPageStore, path, mt);