[jira] [Updated] (PARQUET-2136) File writer construction with encryptor

2022-05-18 Thread Gidon Gershinsky (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gidon Gershinsky updated PARQUET-2136:
--
Fix Version/s: 1.12.3

> File writer construction with encryptor
> ---
>
> Key: PARQUET-2136
> URL: https://issues.apache.org/jira/browse/PARQUET-2136
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.12.2
>Reporter: Gidon Gershinsky
>Assignee: Gidon Gershinsky
>Priority: Major
> Fix For: 1.12.3
>
>
> Currently, a file writer object can be constructed with encryption 
> properties. We need an additional constructor, that can accept an encryptor 
> instead, in order to support lazy materialization of parquet file writers.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (PARQUET-2144) Fix ColumnIndexBuilder for notIn predicate

2022-05-18 Thread Gidon Gershinsky (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gidon Gershinsky updated PARQUET-2144:
--
Fix Version/s: 1.12.3

> Fix ColumnIndexBuilder for notIn predicate
> --
>
> Key: PARQUET-2144
> URL: https://issues.apache.org/jira/browse/PARQUET-2144
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Reporter: Huaxin Gao
>Priority: Major
> Fix For: 1.12.3
>
>
> Column Index is not built correctly for notIn predicate. Need to fix the bug.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (PARQUET-2127) Security risk in latest parquet-jackson-1.12.2.jar

2022-05-18 Thread Gidon Gershinsky (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gidon Gershinsky updated PARQUET-2127:
--
Fix Version/s: 1.12.3

> Security risk in latest parquet-jackson-1.12.2.jar
> --
>
> Key: PARQUET-2127
> URL: https://issues.apache.org/jira/browse/PARQUET-2127
> Project: Parquet
>  Issue Type: Improvement
>Reporter: phoebe chen
>Priority: Major
> Fix For: 1.12.3
>
>
> Embed jackson-databind:2.11.4 has security risk of Possible DoS if using JDK 
> serialization to serialize JsonNode 
> ([https://github.com/FasterXML/jackson-databind/issues/3328] ), upgrade to 
> 2.13.1 can fix this.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539121#comment-17539121
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130698923

   @theosib-amazon I applied my PR on top of your PR, ran thru some tests using 
Spark, and hit no issues. (All unit tests passed as well).




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] parthchandra commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130698923

   @theosib-amazon I applied my PR on top of your PR, ran thru some tests using 
Spark, and hit no issues. (All unit tests passed as well).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539119#comment-17539119
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876197602


##
parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java:
##
@@ -37,4 +39,9 @@ public interface PageReader {
* @return the next page in that chunk or null if after the last page
*/
   DataPage readPage();
+
+  /**
+   * Close the page reader. By default it is no-op.
+   */
+  default void close() throws IOException {}

Review Comment:
   Sure.



##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  final SeekableInputStream fileInputStream;
+  int fetchIndex = 0;
+  int readIndex = 0;
+  ExecutorService threadPool;
+  LinkedBlockingQueue> readFutures;
+  boolean closed = false;
+  Exception ioException;
+
+  LongAdder totalTimeBlocked = new LongAdder();
+  LongAdder totalCountBlocked = new LongAdder();
+  LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+synchronized (this) {
+  if (ioException != null) {
+throw new RuntimeException(ioException);
+  }
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  long putStart = System.nanoTime();
+  long putCompleted = System.nanoTime();
+  LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",
+(putCompleted - putStart) / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  // Save the exception so that the calling thread can check if something 
went wrong.
+  // checkState will throw an exception if the read task has failed.
+  synchronized(this) {
+ioException = e;
+  }
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+

[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


parthchandra commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876197602


##
parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java:
##
@@ -37,4 +39,9 @@ public interface PageReader {
* @return the next page in that chunk or null if after the last page
*/
   DataPage readPage();
+
+  /**
+   * Close the page reader. By default it is no-op.
+   */
+  default void close() throws IOException {}

Review Comment:
   Sure.



##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  final SeekableInputStream fileInputStream;
+  int fetchIndex = 0;
+  int readIndex = 0;
+  ExecutorService threadPool;
+  LinkedBlockingQueue> readFutures;
+  boolean closed = false;
+  Exception ioException;
+
+  LongAdder totalTimeBlocked = new LongAdder();
+  LongAdder totalCountBlocked = new LongAdder();
+  LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+synchronized (this) {
+  if (ioException != null) {
+throw new RuntimeException(ioException);
+  }
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  long putStart = System.nanoTime();
+  long putCompleted = System.nanoTime();
+  LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",
+(putCompleted - putStart) / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  // Save the exception so that the calling thread can check if something 
went wrong.
+  // checkState will throw an exception if the read task has failed.
+  synchronized(this) {
+ioException = e;
+  }
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}

Review Comment:
   Glad you made me 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538991#comment-17538991
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130327546

   > @parthchandra One thing that confuses me a bit is that these buffers have 
only ByteBuffer inside them. There's no actual I/O, so it's not possible to 
block. Do you have subclasses that provide some sort of access to real I/O?
   
   Good point. `MultiBufferInputStream` is constructed using buffers that have 
been filled already. `AsyncMultiBufferInputStream` takes an input stream as a 
parameter in the constructor and performs the IO itself. In 
`ByteBufferInputStream` I added
   ```
 public static ByteBufferInputStream wrapAsync(ExecutorService threadPool, 
SeekableInputStream fileInputStream,
   List buffers) {
   return new AsyncMultiBufferInputStream(threadPool, fileInputStream, 
buffers);
 }
   ``` 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] parthchandra commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130327546

   > @parthchandra One thing that confuses me a bit is that these buffers have 
only ByteBuffer inside them. There's no actual I/O, so it's not possible to 
block. Do you have subclasses that provide some sort of access to real I/O?
   
   Good point. `MultiBufferInputStream` is constructed using buffers that have 
been filled already. `AsyncMultiBufferInputStream` takes an input stream as a 
parameter in the constructor and performs the IO itself. In 
`ByteBufferInputStream` I added
   ```
 public static ByteBufferInputStream wrapAsync(ExecutorService threadPool, 
SeekableInputStream fileInputStream,
   List buffers) {
   return new AsyncMultiBufferInputStream(threadPool, fileInputStream, 
buffers);
 }
   ``` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538983#comment-17538983
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

kbendick commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876172260


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  final SeekableInputStream fileInputStream;
+  int fetchIndex = 0;
+  int readIndex = 0;
+  ExecutorService threadPool;
+  LinkedBlockingQueue> readFutures;
+  boolean closed = false;
+  Exception ioException;
+
+  LongAdder totalTimeBlocked = new LongAdder();
+  LongAdder totalCountBlocked = new LongAdder();
+  LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+synchronized (this) {
+  if (ioException != null) {
+throw new RuntimeException(ioException);
+  }
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  long putStart = System.nanoTime();
+  long putCompleted = System.nanoTime();
+  LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",
+(putCompleted - putStart) / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  // Save the exception so that the calling thread can check if something 
went wrong.
+  // checkState will throw an exception if the read task has failed.
+  synchronized(this) {
+ioException = e;
+  }
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}

Review Comment:
   Because `checkState` has synchronization, would it be safe to move the 
`checkState` before this somehow or add some kind of less expensive boolean 
check that we can set 

[GitHub] [parquet-mr] kbendick commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


kbendick commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876172260


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  final SeekableInputStream fileInputStream;
+  int fetchIndex = 0;
+  int readIndex = 0;
+  ExecutorService threadPool;
+  LinkedBlockingQueue> readFutures;
+  boolean closed = false;
+  Exception ioException;
+
+  LongAdder totalTimeBlocked = new LongAdder();
+  LongAdder totalCountBlocked = new LongAdder();
+  LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+synchronized (this) {
+  if (ioException != null) {
+throw new RuntimeException(ioException);
+  }
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  long putStart = System.nanoTime();
+  long putCompleted = System.nanoTime();
+  LOG.debug("ASYNC Stream: FS READ (output) BLOCKED - {}",
+(putCompleted - putStart) / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  // Save the exception so that the calling thread can check if something 
went wrong.
+  // checkState will throw an exception if the read task has failed.
+  synchronized(this) {
+ioException = e;
+  }
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}

Review Comment:
   Because `checkState` has synchronization, would it be safe to move the 
`checkState` before this somehow or add some kind of less expensive boolean 
check that we can set to `true` immediately after the `super()` call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538979#comment-17538979
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

kbendick commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876165378


##
parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java:
##
@@ -37,4 +39,9 @@ public interface PageReader {
* @return the next page in that chunk or null if after the last page
*/
   DataPage readPage();
+
+  /**
+   * Close the page reader. By default it is no-op.
+   */
+  default void close() throws IOException {}

Review Comment:
   Should we add `Closeable` as an implemented interface?





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] kbendick commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


kbendick commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r876165378


##
parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java:
##
@@ -37,4 +39,9 @@ public interface PageReader {
* @return the next page in that chunk or null if after the last page
*/
   DataPage readPage();
+
+  /**
+   * Close the page reader. By default it is no-op.
+   */
+  default void close() throws IOException {}

Review Comment:
   Should we add `Closeable` as an implemented interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538969#comment-17538969
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130275378

   @parthchandra One thing that confuses me a bit is that these buffers have 
only ByteBuffer inside them. There's no actual I/O, so it's not possible to 
block. Do you have subclasses that provide some sort of access to real I/O?




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] theosib-amazon commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130275378

   @parthchandra One thing that confuses me a bit is that these buffers have 
only ByteBuffer inside them. There's no actual I/O, so it's not possible to 
block. Do you have subclasses that provide some sort of access to real I/O?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538967#comment-17538967
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130270383

   
   > @parthchandra Would you mind having a look at my I/O performance 
optimization plan for ParquetMR? I think we should coordinate, since we have 
some ideas that might overlap what we touch.
   > 
https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing
   
   @theosib-amazon I read your document and went thru  #960. It looks like for 
the most part, #960 and this PR and complement each other.  The overlap I see 
is in the changes to `MultiBufferInputStream` where you have added the 
`readFully`, and `skipFully` APIs. The bulk of my changes for async IO are in a 
class derived from `MultiBufferInputStream` and the heart of the changes 
depends on overriding `MultiBufferInputStream.nextBuffer`. In 
`MultiBufferInputStream.nextBuffer` the assumption is that all the buffers have 
been read into. In `AsyncMultiBufferInputStream.nextBuffer` this assumption is 
removed and the call *blocks* only if the next required buffer has not been 
read into.
   Now, `skipFully` and `readFully` are potentially blocking calls because both 
call `nextBuffer` repeatedly if necessary. To gain  maximum pipelining, you 
want to make calls to skipFully and readFully such that you never block for too 
long (or at all) in the call. You will get this if you are skipping or reading 
less than the number of bytes in a single buffer. This is generally the case as 
decompression and decoding is at the page level and that is smaller than the 
size of a single buffer. However, for your optimizations, you should be aware 
of this behaviour.
   From what I see, I don't think there will be a conflict.  I'll pull in your 
PR and give it a deeper look. 
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] parthchandra commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130270383

   
   > @parthchandra Would you mind having a look at my I/O performance 
optimization plan for ParquetMR? I think we should coordinate, since we have 
some ideas that might overlap what we touch.
   > 
https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing
   
   @theosib-amazon I read your document and went thru  #960. It looks like for 
the most part, #960 and this PR and complement each other.  The overlap I see 
is in the changes to `MultiBufferInputStream` where you have added the 
`readFully`, and `skipFully` APIs. The bulk of my changes for async IO are in a 
class derived from `MultiBufferInputStream` and the heart of the changes 
depends on overriding `MultiBufferInputStream.nextBuffer`. In 
`MultiBufferInputStream.nextBuffer` the assumption is that all the buffers have 
been read into. In `AsyncMultiBufferInputStream.nextBuffer` this assumption is 
removed and the call *blocks* only if the next required buffer has not been 
read into.
   Now, `skipFully` and `readFully` are potentially blocking calls because both 
call `nextBuffer` repeatedly if necessary. To gain  maximum pipelining, you 
want to make calls to skipFully and readFully such that you never block for too 
long (or at all) in the call. You will get this if you are skipping or reading 
less than the number of bytes in a single buffer. This is generally the case as 
decompression and decoding is at the page level and that is smaller than the 
size of a single buffer. However, for your optimizations, you should be aware 
of this behaviour.
   From what I see, I don't think there will be a conflict.  I'll pull in your 
PR and give it a deeper look. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538945#comment-17538945
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130229884

   > Great effort! WIll have a look after the build succeed.
   @shangxinli I have no idea how to get the failed CI to pass. These  failures 
appear to be in unrelated areas caused by some infra issues. Is there a way to 
trigger a rerun? 
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] parthchandra commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


parthchandra commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130229884

   > Great effort! WIll have a look after the build succeed.
   @shangxinli I have no idea how to get the failed CI to pass. These  failures 
appear to be in unrelated areas caused by some infra issues. Is there a way to 
trigger a rerun? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538934#comment-17538934
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

dbtsai commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130214186

   cc @rdblue @gszadovszky @ggershinsky




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] dbtsai commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


dbtsai commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130214186

   cc @rdblue @gszadovszky @ggershinsky


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538925#comment-17538925
 ] 

ASF GitHub Bot commented on PARQUET-2148:
-

shangxinli merged PR #969:
URL: https://github.com/apache/parquet-mr/pull/969




> Enable uniform decryption with plaintext footer
> ---
>
> Key: PARQUET-2148
> URL: https://issues.apache.org/jira/browse/PARQUET-2148
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Reporter: Gidon Gershinsky
>Assignee: Gidon Gershinsky
>Priority: Major
> Fix For: 1.12.3
>
>
> Currently, uniform decryption is not enabled in the plaintext footer mode - 
> for no good reason. Column metadata is available, we just need to decrypt and 
> use it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] shangxinli merged pull request #969: PARQUET-2148: Enable uniform decryption with plaintext footer

2022-05-18 Thread GitBox


shangxinli merged PR #969:
URL: https://github.com/apache/parquet-mr/pull/969


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538912#comment-17538912
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130176799

   @parthchandra Would you mind having a look at my I/O performance 
optimization plan for ParquetMR? I think we should coordinate, since we have 
some ideas that might overlap what we touch.
   
https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] theosib-amazon commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


theosib-amazon commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130176799

   @parthchandra Would you mind having a look at my I/O performance 
optimization plan for ParquetMR? I think we should coordinate, since we have 
some ideas that might overlap what we touch.
   
https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538906#comment-17538906
 ] 

ASF GitHub Bot commented on PARQUET-2148:
-

ggershinsky commented on code in PR #969:
URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876028764


##
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java:
##
@@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData 
parquetMetadata,
   } else {  // Encrypted column
 boolean encryptedWithFooterKey = 
cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY();
 if (encryptedWithFooterKey) { // Column encrypted with footer key
-  if (!encryptedFooter) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key in file with plaintext footer");
+  if (null == fileDecryptor) {
+throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
   }
   if (null == metaData) {
 throw new ParquetCryptoRuntimeException("ColumnMetaData not 
set in Encryption with Footer key");
   }
-  if (null == fileDecryptor) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
-  }
   columnPath = getPath(metaData);
+  if (!encryptedFooter) { // Unencrypted footer. Decrypt full 
column metadata, using footer key

Review Comment:
   this already works ok with encrypted footers; now fixing for plaintext 
footers.





> Enable uniform decryption with plaintext footer
> ---
>
> Key: PARQUET-2148
> URL: https://issues.apache.org/jira/browse/PARQUET-2148
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Reporter: Gidon Gershinsky
>Assignee: Gidon Gershinsky
>Priority: Major
> Fix For: 1.12.3
>
>
> Currently, uniform decryption is not enabled in the plaintext footer mode - 
> for no good reason. Column metadata is available, we just need to decrypt and 
> use it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #969: PARQUET-2148: Enable uniform decryption with plaintext footer

2022-05-18 Thread GitBox


ggershinsky commented on code in PR #969:
URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876028764


##
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java:
##
@@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData 
parquetMetadata,
   } else {  // Encrypted column
 boolean encryptedWithFooterKey = 
cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY();
 if (encryptedWithFooterKey) { // Column encrypted with footer key
-  if (!encryptedFooter) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key in file with plaintext footer");
+  if (null == fileDecryptor) {
+throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
   }
   if (null == metaData) {
 throw new ParquetCryptoRuntimeException("ColumnMetaData not 
set in Encryption with Footer key");
   }
-  if (null == fileDecryptor) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
-  }
   columnPath = getPath(metaData);
+  if (!encryptedFooter) { // Unencrypted footer. Decrypt full 
column metadata, using footer key

Review Comment:
   this already works ok with encrypted footers; now fixing for plaintext 
footers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #969: PARQUET-2148: Enable uniform decryption with plaintext footer

2022-05-18 Thread GitBox


ggershinsky commented on code in PR #969:
URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876022965


##
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java:
##
@@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData 
parquetMetadata,
   } else {  // Encrypted column
 boolean encryptedWithFooterKey = 
cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY();
 if (encryptedWithFooterKey) { // Column encrypted with footer key
-  if (!encryptedFooter) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key in file with plaintext footer");
+  if (null == fileDecryptor) {
+throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
   }
   if (null == metaData) {
 throw new ParquetCryptoRuntimeException("ColumnMetaData not 
set in Encryption with Footer key");
   }
-  if (null == fileDecryptor) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
-  }
   columnPath = getPath(metaData);
+  if (!encryptedFooter) { // Unencrypted footer. Decrypt full 
column metadata, using footer key

Review Comment:
   with uniform encryption, we leverage the "encrypt_with_footer_key" mode in 
parquet-format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538900#comment-17538900
 ] 

ASF GitHub Bot commented on PARQUET-2148:
-

ggershinsky commented on code in PR #969:
URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876022965


##
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java:
##
@@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData 
parquetMetadata,
   } else {  // Encrypted column
 boolean encryptedWithFooterKey = 
cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY();
 if (encryptedWithFooterKey) { // Column encrypted with footer key
-  if (!encryptedFooter) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key in file with plaintext footer");
+  if (null == fileDecryptor) {
+throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
   }
   if (null == metaData) {
 throw new ParquetCryptoRuntimeException("ColumnMetaData not 
set in Encryption with Footer key");
   }
-  if (null == fileDecryptor) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
-  }
   columnPath = getPath(metaData);
+  if (!encryptedFooter) { // Unencrypted footer. Decrypt full 
column metadata, using footer key

Review Comment:
   with uniform encryption, we leverage the "encrypt_with_footer_key" mode in 
parquet-format





> Enable uniform decryption with plaintext footer
> ---
>
> Key: PARQUET-2148
> URL: https://issues.apache.org/jira/browse/PARQUET-2148
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Reporter: Gidon Gershinsky
>Assignee: Gidon Gershinsky
>Priority: Major
> Fix For: 1.12.3
>
>
> Currently, uniform decryption is not enabled in the plaintext footer mode - 
> for no good reason. Column metadata is available, we just need to decrypt and 
> use it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] shangxinli commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-18 Thread GitBox


shangxinli commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1130130378

   Great effort! WIll have a look after the build succeed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #969: PARQUET-2148: Enable uniform decryption with plaintext footer

2022-05-18 Thread GitBox


shangxinli commented on code in PR #969:
URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876012014


##
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java:
##
@@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData 
parquetMetadata,
   } else {  // Encrypted column
 boolean encryptedWithFooterKey = 
cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY();
 if (encryptedWithFooterKey) { // Column encrypted with footer key
-  if (!encryptedFooter) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key in file with plaintext footer");
+  if (null == fileDecryptor) {
+throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
   }
   if (null == metaData) {
 throw new ParquetCryptoRuntimeException("ColumnMetaData not 
set in Encryption with Footer key");
   }
-  if (null == fileDecryptor) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
-  }
   columnPath = getPath(metaData);
+  if (!encryptedFooter) { // Unencrypted footer. Decrypt full 
column metadata, using footer key

Review Comment:
   I am not following why we use the footer key to decrypt? Wasn't the column 
key used before? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538893#comment-17538893
 ] 

ASF GitHub Bot commented on PARQUET-2148:
-

shangxinli commented on code in PR #969:
URL: https://github.com/apache/parquet-mr/pull/969#discussion_r876012014


##
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java:
##
@@ -1556,25 +1558,32 @@ public ParquetMetadata fromParquetMetadata(FileMetaData 
parquetMetadata,
   } else {  // Encrypted column
 boolean encryptedWithFooterKey = 
cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY();
 if (encryptedWithFooterKey) { // Column encrypted with footer key
-  if (!encryptedFooter) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key in file with plaintext footer");
+  if (null == fileDecryptor) {
+throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
   }
   if (null == metaData) {
 throw new ParquetCryptoRuntimeException("ColumnMetaData not 
set in Encryption with Footer key");
   }
-  if (null == fileDecryptor) {
-throw new ParquetCryptoRuntimeException("Column encrypted with 
footer key: No keys available");
-  }
   columnPath = getPath(metaData);
+  if (!encryptedFooter) { // Unencrypted footer. Decrypt full 
column metadata, using footer key

Review Comment:
   I am not following why we use the footer key to decrypt? Wasn't the column 
key used before? 





> Enable uniform decryption with plaintext footer
> ---
>
> Key: PARQUET-2148
> URL: https://issues.apache.org/jira/browse/PARQUET-2148
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Reporter: Gidon Gershinsky
>Assignee: Gidon Gershinsky
>Priority: Major
> Fix For: 1.12.3
>
>
> Currently, uniform decryption is not enabled in the plaintext footer mode - 
> for no good reason. Column metadata is available, we just need to decrypt and 
> use it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2148) Enable uniform decryption with plaintext footer

2022-05-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538757#comment-17538757
 ] 

ASF GitHub Bot commented on PARQUET-2148:
-

ggershinsky opened a new pull request, #969:
URL: https://github.com/apache/parquet-mr/pull/969

   Currently, uniform decryption is not enabled in the plaintext footer mode - 
for no good reason. Column metadata is available, we just need to decrypt and 
use it.




> Enable uniform decryption with plaintext footer
> ---
>
> Key: PARQUET-2148
> URL: https://issues.apache.org/jira/browse/PARQUET-2148
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Reporter: Gidon Gershinsky
>Assignee: Gidon Gershinsky
>Priority: Major
> Fix For: 1.12.3
>
>
> Currently, uniform decryption is not enabled in the plaintext footer mode - 
> for no good reason. Column metadata is available, we just need to decrypt and 
> use it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] ggershinsky opened a new pull request, #969: PARQUET-2148: Enable uniform decryption with plaintext footer

2022-05-18 Thread GitBox


ggershinsky opened a new pull request, #969:
URL: https://github.com/apache/parquet-mr/pull/969

   Currently, uniform decryption is not enabled in the plaintext footer mode - 
for no good reason. Column metadata is available, we just need to decrypt and 
use it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org