[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181589911 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- I think that better integration between `ByteBuffer` and `ColumnVector` would be addressed in another PR. Since tableCache also uses `ByteBuffer`, it would be good to discuss in another PR. cc: @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181564129 --- Diff: pom.xml --- @@ -129,7 +129,7 @@ 1.2.1 10.12.1.1 -1.8.2 +1.10.0 --- End diff -- Unlike [last time](https://github.com/apache/spark/commit/26a4cba3ffaadf382ca14980378965704ccef9ab), it seems that this PR touches `commons-pool` dependency together? Can we avoid this? ``` -commons-pool-1.5.4.jar +commons-pool-1.6.jar ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181563672 --- Diff: pom.xml --- @@ -129,7 +129,7 @@ 1.2.1 10.12.1.1 -1.8.2 +1.10.0 --- End diff -- @rdblue . To see the Jenkins result, could you resolve the dependency check failure with the following? ``` ./dev/test-dependencies.sh --replace-manifest ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181540952 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Couldn't it also be writing to an `OffHeapColumnVector`? https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L199 If so, I think the copy is 1MB at a time: https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L189 I agree that ByteBuffer shouldn't be supported in this PR. But there's an opportunity to use the bulk copy APIs which would benefit from any future optimization that happens. Plus even if the copy does eventually become a loop inside the column vector implementation, there's more chance of the JIT unrolling the loop since it's smaller. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181535144 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- It looks that way, but it actually replaces a similar loop: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java#L283-L291 The main problem is that ByteBufffer isn't supported in the column vectors. That seems beyond the scope of this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181528729 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Here and elsewhere a bulk copy has been replaced by many smaller copies. It would be better to be able to use the bulk version. I think it would be preferable to at least have: if (buffer.hasArray()) { c.putIntsLittleEndian(rowId, total, buffer.array(), 0); } else { for (int i = 0 // ... etc } --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21070 SPARK-23972: Update Parquet to 1.10.0. ## What changes were proposed in this pull request? This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection. ## How was this patch tested? Existing Parquet tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-23972-update-parquet-to-1.10.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21070.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21070 commit 4df17a6e9726cb22e499d479a9ab48f5db18a538 Author: Ryan Blue Date: 2017-12-01T01:25:53Z SPARK-23972: Update Parquet to 1.10.0. This updates the vectorized path for changes in Parquet 1.10.0, which uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org