[jira] [Updated] (PARQUET-2136) File writer construction with encryptor
[ https://issues.apache.org/jira/browse/PARQUET-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gidon Gershinsky updated PARQUET-2136: -- Fix Version/s: 1.12.3 > File writer construction with encryptor > --- > > Key: PARQUET-2136 > URL: https://issues.apache.org/jira/browse/PARQUET-2136 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Affects Versions: 1.12.2 >Reporter: Gidon Gershinsky >Assignee: Gidon Gershinsky >Priority: Major > Fix For: 1.12.3 > > > Currently, a file writer object can be constructed with encryption > properties. We need an additional constructor, that can accept an encryptor > instead, in order to support lazy materialization of parquet file writers. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (PARQUET-2144) Fix ColumnIndexBuilder for notIn predicate
[ https://issues.apache.org/jira/browse/PARQUET-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gidon Gershinsky updated PARQUET-2144: -- Fix Version/s: 1.12.3 > Fix ColumnIndexBuilder for notIn predicate > -- > > Key: PARQUET-2144 > URL: https://issues.apache.org/jira/browse/PARQUET-2144 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Reporter: Huaxin Gao >Priority: Major > Fix For: 1.12.3 > > > Column Index is not built correctly for notIn predicate. Need to fix the bug. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (PARQUET-2127) Security risk in latest parquet-jackson-1.12.2.jar
[ https://issues.apache.org/jira/browse/PARQUET-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gidon Gershinsky updated PARQUET-2127: -- Fix Version/s: 1.12.3 > Security risk in latest parquet-jackson-1.12.2.jar > -- > > Key: PARQUET-2127 > URL: https://issues.apache.org/jira/browse/PARQUET-2127 > Project: Parquet > Issue Type: Improvement >Reporter: phoebe chen >Priority: Major > Fix For: 1.12.3 > > > Embed jackson-databind:2.11.4 has security risk of Possible DoS if using JDK > serialization to serialize JsonNode > ([https://github.com/FasterXML/jackson-databind/issues/3328] ), upgrade to > 2.13.1 can fix this. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539121#comment-17539121 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130698923 @theosib-amazon I applied my PR on top of your PR, ran thru some tests using Spark, and hit no issues. (All unit tests passed as well). > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] parthchandra commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130698923 @theosib-amazon I applied my PR on top of your PR, ran thru some tests using Spark, and hit no issues. (All unit tests passed as well). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539119#comment-17539119 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876197602 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -37,4 +39,9 @@ public interface PageReader { * @return the next page in that chunk or null if after the last page */ DataPage readPage(); + + /** + * Close the page reader. By default it is no-op. + */ + default void close() throws IOException {} Review Comment: Sure. ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + final SeekableInputStream fileInputStream; + int fetchIndex = 0; + int readIndex = 0; + ExecutorService threadPool; + LinkedBlockingQueue> readFutures; + boolean closed = false; + Exception ioException; + + LongAdder totalTimeBlocked = new LongAdder(); + LongAdder totalCountBlocked = new LongAdder(); + LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} +synchronized (this) { + if (ioException != null) { +throw new RuntimeException(ioException); + } +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + long putStart = System.nanoTime(); + long putCompleted = System.nanoTime(); + LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}", +(putCompleted - putStart) / 1000.0); + fetchIndex++; +} catch (IOException e) { + // Save the exception so that the calling thread can check if something went wrong. + // checkState will throw an exception if the read task has failed. + synchronized(this) { +ioException = e; + } + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +
[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
parthchandra commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876197602 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -37,4 +39,9 @@ public interface PageReader { * @return the next page in that chunk or null if after the last page */ DataPage readPage(); + + /** + * Close the page reader. By default it is no-op. + */ + default void close() throws IOException {} Review Comment: Sure. ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + final SeekableInputStream fileInputStream; + int fetchIndex = 0; + int readIndex = 0; + ExecutorService threadPool; + LinkedBlockingQueue> readFutures; + boolean closed = false; + Exception ioException; + + LongAdder totalTimeBlocked = new LongAdder(); + LongAdder totalCountBlocked = new LongAdder(); + LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} +synchronized (this) { + if (ioException != null) { +throw new RuntimeException(ioException); + } +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + long putStart = System.nanoTime(); + long putCompleted = System.nanoTime(); + LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}", +(putCompleted - putStart) / 1000.0); + fetchIndex++; +} catch (IOException e) { + // Save the exception so that the calling thread can check if something went wrong. + // checkState will throw an exception if the read task has failed. + synchronized(this) { +ioException = e; + } + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +checkState(); +// hack: parent constructor can call this method before this class is fully initialized. +// Just return without doing anything. +if (readFutures == null) { + return false; +} Review Comment: Glad you made me
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538991#comment-17538991 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130327546 > @parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O? Good point. `MultiBufferInputStream` is constructed using buffers that have been filled already. `AsyncMultiBufferInputStream` takes an input stream as a parameter in the constructor and performs the IO itself. In `ByteBufferInputStream` I added ``` public static ByteBufferInputStream wrapAsync(ExecutorService threadPool, SeekableInputStream fileInputStream, List buffers) { return new AsyncMultiBufferInputStream(threadPool, fileInputStream, buffers); } ``` > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] parthchandra commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130327546 > @parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O? Good point. `MultiBufferInputStream` is constructed using buffers that have been filled already. `AsyncMultiBufferInputStream` takes an input stream as a parameter in the constructor and performs the IO itself. In `ByteBufferInputStream` I added ``` public static ByteBufferInputStream wrapAsync(ExecutorService threadPool, SeekableInputStream fileInputStream, List buffers) { return new AsyncMultiBufferInputStream(threadPool, fileInputStream, buffers); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538983#comment-17538983 ] ASF GitHub Bot commented on PARQUET-2149: - kbendick commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876172260 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + final SeekableInputStream fileInputStream; + int fetchIndex = 0; + int readIndex = 0; + ExecutorService threadPool; + LinkedBlockingQueue> readFutures; + boolean closed = false; + Exception ioException; + + LongAdder totalTimeBlocked = new LongAdder(); + LongAdder totalCountBlocked = new LongAdder(); + LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} +synchronized (this) { + if (ioException != null) { +throw new RuntimeException(ioException); + } +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + long putStart = System.nanoTime(); + long putCompleted = System.nanoTime(); + LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}", +(putCompleted - putStart) / 1000.0); + fetchIndex++; +} catch (IOException e) { + // Save the exception so that the calling thread can check if something went wrong. + // checkState will throw an exception if the read task has failed. + synchronized(this) { +ioException = e; + } + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +checkState(); +// hack: parent constructor can call this method before this class is fully initialized. +// Just return without doing anything. +if (readFutures == null) { + return false; +} Review Comment: Because `checkState` has synchronization, would it be safe to move the `checkState` before this somehow or add some kind of less expensive boolean check that we can set
[GitHub] [parquet-mr] kbendick commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
kbendick commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876172260 ## parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + final SeekableInputStream fileInputStream; + int fetchIndex = 0; + int readIndex = 0; + ExecutorService threadPool; + LinkedBlockingQueue> readFutures; + boolean closed = false; + Exception ioException; + + LongAdder totalTimeBlocked = new LongAdder(); + LongAdder totalCountBlocked = new LongAdder(); + LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream fileInputStream, +List buffers) { +super(buffers); +this.fileInputStream = fileInputStream; +this.threadPool = threadPool; +readFutures = new LinkedBlockingQueue<>(buffers.size()); +if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { +LOG.debug("ASYNC: buffer {} ", buf); + } +} +fetchAll(); + } + + private void checkState() { +if (closed) { + throw new RuntimeException("Stream is closed"); +} +synchronized (this) { + if (ioException != null) { +throw new RuntimeException(ioException); + } +} + } + + private void fetchAll() { +checkState(); +submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { +ByteBuffer buffer = buffers.get(bufferNo); +try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { +submitReadTask(bufferNo + 1); + } + return null; +}) + ); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); +} + } + + private void readOneBuffer(ByteBuffer buffer) { +long startTime = System.nanoTime(); +try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + long putStart = System.nanoTime(); + long putCompleted = System.nanoTime(); + LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}", +(putCompleted - putStart) / 1000.0); + fetchIndex++; +} catch (IOException e) { + // Save the exception so that the calling thread can check if something went wrong. + // checkState will throw an exception if the read task has failed. + synchronized(this) { +ioException = e; + } + throw new RuntimeException(e); +} + } + + @Override + public boolean nextBuffer() { +checkState(); +// hack: parent constructor can call this method before this class is fully initialized. +// Just return without doing anything. +if (readFutures == null) { + return false; +} Review Comment: Because `checkState` has synchronization, would it be safe to move the `checkState` before this somehow or add some kind of less expensive boolean check that we can set to `true` immediately after the `super()` call? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538979#comment-17538979 ] ASF GitHub Bot commented on PARQUET-2149: - kbendick commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876165378 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -37,4 +39,9 @@ public interface PageReader { * @return the next page in that chunk or null if after the last page */ DataPage readPage(); + + /** + * Close the page reader. By default it is no-op. + */ + default void close() throws IOException {} Review Comment: Should we add `Closeable` as an implemented interface? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] kbendick commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
kbendick commented on code in PR #968: URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876165378 ## parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java: ## @@ -37,4 +39,9 @@ public interface PageReader { * @return the next page in that chunk or null if after the last page */ DataPage readPage(); + + /** + * Close the page reader. By default it is no-op. + */ + default void close() throws IOException {} Review Comment: Should we add `Closeable` as an implemented interface? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538969#comment-17538969 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130275378 @parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] theosib-amazon commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130275378 @parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538967#comment-17538967 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130270383 > @parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch. > https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing @theosib-amazon I read your document and went thru #960. It looks like for the most part, #960 and this PR and complement each other. The overlap I see is in the changes to `MultiBufferInputStream` where you have added the `readFully`, and `skipFully` APIs. The bulk of my changes for async IO are in a class derived from `MultiBufferInputStream` and the heart of the changes depends on overriding `MultiBufferInputStream.nextBuffer`. In `MultiBufferInputStream.nextBuffer` the assumption is that all the buffers have been read into. In `AsyncMultiBufferInputStream.nextBuffer` this assumption is removed and the call *blocks* only if the next required buffer has not been read into. Now, `skipFully` and `readFully` are potentially blocking calls because both call `nextBuffer` repeatedly if necessary. To gain maximum pipelining, you want to make calls to skipFully and readFully such that you never block for too long (or at all) in the call. You will get this if you are skipping or reading less than the number of bytes in a single buffer. This is generally the case as decompression and decoding is at the page level and that is smaller than the size of a single buffer. However, for your optimizations, you should be aware of this behaviour. From what I see, I don't think there will be a conflict. I'll pull in your PR and give it a deeper look. > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] parthchandra commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130270383 > @parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch. > https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing @theosib-amazon I read your document and went thru #960. It looks like for the most part, #960 and this PR and complement each other. The overlap I see is in the changes to `MultiBufferInputStream` where you have added the `readFully`, and `skipFully` APIs. The bulk of my changes for async IO are in a class derived from `MultiBufferInputStream` and the heart of the changes depends on overriding `MultiBufferInputStream.nextBuffer`. In `MultiBufferInputStream.nextBuffer` the assumption is that all the buffers have been read into. In `AsyncMultiBufferInputStream.nextBuffer` this assumption is removed and the call *blocks* only if the next required buffer has not been read into. Now, `skipFully` and `readFully` are potentially blocking calls because both call `nextBuffer` repeatedly if necessary. To gain maximum pipelining, you want to make calls to skipFully and readFully such that you never block for too long (or at all) in the call. You will get this if you are skipping or reading less than the number of bytes in a single buffer. This is generally the case as decompression and decoding is at the page level and that is smaller than the size of a single buffer. However, for your optimizations, you should be aware of this behaviour. From what I see, I don't think there will be a conflict. I'll pull in your PR and give it a deeper look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538945#comment-17538945 ] ASF GitHub Bot commented on PARQUET-2149: - parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130229884 > Great effort! WIll have a look after the build succeed. @shangxinli I have no idea how to get the failed CI to pass. These failures appear to be in unrelated areas caused by some infra issues. Is there a way to trigger a rerun? > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] parthchandra commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
parthchandra commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130229884 > Great effort! WIll have a look after the build succeed. @shangxinli I have no idea how to get the failed CI to pass. These failures appear to be in unrelated areas caused by some infra issues. Is there a way to trigger a rerun? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538934#comment-17538934 ] ASF GitHub Bot commented on PARQUET-2149: - dbtsai commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130214186 cc @rdblue @gszadovszky @ggershinsky > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] dbtsai commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
dbtsai commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130214186 cc @rdblue @gszadovszky @ggershinsky -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer
[ https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538925#comment-17538925 ] ASF GitHub Bot commented on PARQUET-2148: - shangxinli merged PR #969: URL: https://github.com/apache/parquet-mr/pull/969 > Enable uniform decryption with plaintext footer > --- > > Key: PARQUET-2148 > URL: https://issues.apache.org/jira/browse/PARQUET-2148 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Reporter: Gidon Gershinsky >Assignee: Gidon Gershinsky >Priority: Major > Fix For: 1.12.3 > > > Currently, uniform decryption is not enabled in the plaintext footer mode - > for no good reason. Column metadata is available, we just need to decrypt and > use it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] shangxinli merged pull request #969: PARQUET-2148: Enable uniform decryption with plaintext footer
shangxinli merged PR #969: URL: https://github.com/apache/parquet-mr/pull/969 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader
[ https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538912#comment-17538912 ] ASF GitHub Bot commented on PARQUET-2149: - theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130176799 @parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch. https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing > Implement async IO for Parquet file reader > -- > > Key: PARQUET-2149 > URL: https://issues.apache.org/jira/browse/PARQUET-2149 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Parth Chandra >Priority: Major > > ParquetFileReader's implementation has the following flow (simplified) - > - For every column -> Read from storage in 8MB blocks -> Read all > uncompressed pages into output queue > - From output queues -> (downstream ) decompression + decoding > This flow is serialized, which means that downstream threads are blocked > until the data has been read. Because a large part of the time spent is > waiting for data from storage, threads are idle and CPU utilization is really > low. > There is no reason why this cannot be made asynchronous _and_ parallel. So > For Column _i_ -> reading one chunk until end, from storage -> intermediate > output queue -> read one uncompressed page until end -> output queue -> > (downstream ) decompression + decoding > Note that this can be made completely self contained in ParquetFileReader and > downstream implementations like Iceberg and Spark will automatically be able > to take advantage without code change as long as the ParquetFileReader apis > are not changed. > In past work with async io [Drill - async page reader > |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java] > , I have seen 2x-3x improvement in reading speed for Parquet files. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] theosib-amazon commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
theosib-amazon commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130176799 @parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch. https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer
[ https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538906#comment-17538906 ] ASF GitHub Bot commented on PARQUET-2148: - ggershinsky commented on code in PR #969: URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876028764 ## parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java: ## @@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, } else { // Encrypted column boolean encryptedWithFooterKey = cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY(); if (encryptedWithFooterKey) { // Column encrypted with footer key - if (!encryptedFooter) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key in file with plaintext footer"); + if (null == fileDecryptor) { +throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); } if (null == metaData) { throw new ParquetCryptoRuntimeException("ColumnMetaData not set in Encryption with Footer key"); } - if (null == fileDecryptor) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); - } columnPath = getPath(metaData); + if (!encryptedFooter) { // Unencrypted footer. Decrypt full column metadata, using footer key Review Comment: this already works ok with encrypted footers; now fixing for plaintext footers. > Enable uniform decryption with plaintext footer > --- > > Key: PARQUET-2148 > URL: https://issues.apache.org/jira/browse/PARQUET-2148 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Reporter: Gidon Gershinsky >Assignee: Gidon Gershinsky >Priority: Major > Fix For: 1.12.3 > > > Currently, uniform decryption is not enabled in the plaintext footer mode - > for no good reason. Column metadata is available, we just need to decrypt and > use it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #969: PARQUET-2148: Enable uniform decryption with plaintext footer
ggershinsky commented on code in PR #969: URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876028764 ## parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java: ## @@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, } else { // Encrypted column boolean encryptedWithFooterKey = cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY(); if (encryptedWithFooterKey) { // Column encrypted with footer key - if (!encryptedFooter) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key in file with plaintext footer"); + if (null == fileDecryptor) { +throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); } if (null == metaData) { throw new ParquetCryptoRuntimeException("ColumnMetaData not set in Encryption with Footer key"); } - if (null == fileDecryptor) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); - } columnPath = getPath(metaData); + if (!encryptedFooter) { // Unencrypted footer. Decrypt full column metadata, using footer key Review Comment: this already works ok with encrypted footers; now fixing for plaintext footers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #969: PARQUET-2148: Enable uniform decryption with plaintext footer
ggershinsky commented on code in PR #969: URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876022965 ## parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java: ## @@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, } else { // Encrypted column boolean encryptedWithFooterKey = cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY(); if (encryptedWithFooterKey) { // Column encrypted with footer key - if (!encryptedFooter) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key in file with plaintext footer"); + if (null == fileDecryptor) { +throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); } if (null == metaData) { throw new ParquetCryptoRuntimeException("ColumnMetaData not set in Encryption with Footer key"); } - if (null == fileDecryptor) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); - } columnPath = getPath(metaData); + if (!encryptedFooter) { // Unencrypted footer. Decrypt full column metadata, using footer key Review Comment: with uniform encryption, we leverage the "encrypt_with_footer_key" mode in parquet-format -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer
[ https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538900#comment-17538900 ] ASF GitHub Bot commented on PARQUET-2148: - ggershinsky commented on code in PR #969: URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876022965 ## parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java: ## @@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, } else { // Encrypted column boolean encryptedWithFooterKey = cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY(); if (encryptedWithFooterKey) { // Column encrypted with footer key - if (!encryptedFooter) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key in file with plaintext footer"); + if (null == fileDecryptor) { +throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); } if (null == metaData) { throw new ParquetCryptoRuntimeException("ColumnMetaData not set in Encryption with Footer key"); } - if (null == fileDecryptor) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); - } columnPath = getPath(metaData); + if (!encryptedFooter) { // Unencrypted footer. Decrypt full column metadata, using footer key Review Comment: with uniform encryption, we leverage the "encrypt_with_footer_key" mode in parquet-format > Enable uniform decryption with plaintext footer > --- > > Key: PARQUET-2148 > URL: https://issues.apache.org/jira/browse/PARQUET-2148 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Reporter: Gidon Gershinsky >Assignee: Gidon Gershinsky >Priority: Major > Fix For: 1.12.3 > > > Currently, uniform decryption is not enabled in the plaintext footer mode - > for no good reason. Column metadata is available, we just need to decrypt and > use it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] shangxinli commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader
shangxinli commented on PR #968: URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130130378 Great effort! WIll have a look after the build succeed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #969: PARQUET-2148: Enable uniform decryption with plaintext footer
shangxinli commented on code in PR #969: URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876012014 ## parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java: ## @@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, } else { // Encrypted column boolean encryptedWithFooterKey = cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY(); if (encryptedWithFooterKey) { // Column encrypted with footer key - if (!encryptedFooter) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key in file with plaintext footer"); + if (null == fileDecryptor) { +throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); } if (null == metaData) { throw new ParquetCryptoRuntimeException("ColumnMetaData not set in Encryption with Footer key"); } - if (null == fileDecryptor) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); - } columnPath = getPath(metaData); + if (!encryptedFooter) { // Unencrypted footer. Decrypt full column metadata, using footer key Review Comment: I am not following why we use the footer key to decrypt? Wasn't the column key used before? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer
[ https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538893#comment-17538893 ] ASF GitHub Bot commented on PARQUET-2148: - shangxinli commented on code in PR #969: URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876012014 ## parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java: ## @@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, } else { // Encrypted column boolean encryptedWithFooterKey = cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY(); if (encryptedWithFooterKey) { // Column encrypted with footer key - if (!encryptedFooter) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key in file with plaintext footer"); + if (null == fileDecryptor) { +throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); } if (null == metaData) { throw new ParquetCryptoRuntimeException("ColumnMetaData not set in Encryption with Footer key"); } - if (null == fileDecryptor) { -throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available"); - } columnPath = getPath(metaData); + if (!encryptedFooter) { // Unencrypted footer. Decrypt full column metadata, using footer key Review Comment: I am not following why we use the footer key to decrypt? Wasn't the column key used before? > Enable uniform decryption with plaintext footer > --- > > Key: PARQUET-2148 > URL: https://issues.apache.org/jira/browse/PARQUET-2148 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Reporter: Gidon Gershinsky >Assignee: Gidon Gershinsky >Priority: Major > Fix For: 1.12.3 > > > Currently, uniform decryption is not enabled in the plaintext footer mode - > for no good reason. Column metadata is available, we just need to decrypt and > use it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer
[ https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538757#comment-17538757 ] ASF GitHub Bot commented on PARQUET-2148: - ggershinsky opened a new pull request, #969: URL: https://github.com/apache/parquet-mr/pull/969 Currently, uniform decryption is not enabled in the plaintext footer mode - for no good reason. Column metadata is available, we just need to decrypt and use it. > Enable uniform decryption with plaintext footer > --- > > Key: PARQUET-2148 > URL: https://issues.apache.org/jira/browse/PARQUET-2148 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Reporter: Gidon Gershinsky >Assignee: Gidon Gershinsky >Priority: Major > Fix For: 1.12.3 > > > Currently, uniform decryption is not enabled in the plaintext footer mode - > for no good reason. Column metadata is available, we just need to decrypt and > use it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] ggershinsky opened a new pull request, #969: PARQUET-2148: Enable uniform decryption with plaintext footer
ggershinsky opened a new pull request, #969: URL: https://github.com/apache/parquet-mr/pull/969 Currently, uniform decryption is not enabled in the plaintext footer mode - for no good reason. Column metadata is available, we just need to decrypt and use it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org