[GitHub] [parquet-mr] shangxinli commented on pull request #960: Performance optimization: Move all LittleEndianDataInputStream functionality into ByteBufferInputStream

2022-12-03 Thread GitBox


shangxinli commented on PR #960:
URL: https://github.com/apache/parquet-mr/pull/960#issuecomment-1336239074

   @theosib-amazon Thanks again for your contribution! I see the comments are 
generally around duplicating code, refactoring, and making code maintainable. 
If you have a measurement of improvement on this change alone, it would help 
the reviewers 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2208) Add details to nested column encryption config doc and exception text

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642899#comment-17642899
 ] 

ASF GitHub Bot commented on PARQUET-2208:
-

shangxinli merged PR #1009:
URL: https://github.com/apache/parquet-mr/pull/1009




> Add details to nested column encryption config doc and exception text
> -
>
> Key: PARQUET-2208
> URL: https://issues.apache.org/jira/browse/PARQUET-2208
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.12.3
>Reporter: Gidon Gershinsky
>Priority: Minor
>
> Parquet columnar encryption requires an explicit full path for each column to 
> be encrypted. If a partial path is configured, the thrown exception is not 
> informative enough, doesn't help much in correcting the parameters.
> The goal is to make the exception print something like:
> _Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: 
> Encrypted column [rider] not in file schema column list: [foo] , 
> [rider.list.element.foo] , [rider.list.element.bar] , [ts] , [uuid]_
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli merged pull request #1009: PARQUET-2208: Add details to nested column encryption config doc and exception text

2022-12-03 Thread GitBox


shangxinli merged PR #1009:
URL: https://github.com/apache/parquet-mr/pull/1009


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2212) Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642898#comment-17642898
 ] 

ASF GitHub Bot commented on PARQUET-2212:
-

shangxinli commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1038821325


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -133,11 +135,36 @@ public DataPage readPage() {
 public DataPage visit(DataPageV1 dataPageV1) {
   try {
 BytesInput bytes = dataPageV1.getBytes();
-if (null != blockDecryptor) {
-  bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+BytesInput decompressed;
+
+if (options.getAllocator().isDirect() && 
options.useOffHeapDecryptBuffer()) {
+  ByteBuffer byteBuffer = bytes.toByteBuffer();
+  if (!byteBuffer.isDirect()) {
+throw new ParquetDecodingException("Expected a direct buffer");
+  }
+  if (blockDecryptor != null) {
+byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
+  }
+  long compressedSize = byteBuffer.limit();
+
+  ByteBuffer decompressedBuffer =
+  
options.getAllocator().allocate(dataPageV1.getUncompressedSize());
+  decompressor.decompress(byteBuffer, (int) compressedSize, 
decompressedBuffer,
+  dataPageV1.getUncompressedSize());
+
+  // HACKY: sometimes we need to do `flip` because the position of 
output bytebuffer is

Review Comment:
   Do we know in what scenario the output byte buffer is not set? 





> Add ByteBuffer api for decryptors to allow direct memory to be decrypted
> 
>
> Key: PARQUET-2212
> URL: https://issues.apache.org/jira/browse/PARQUET-2212
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.12.3
>Reporter: Parth Chandra
>Priority: Major
> Fix For: 1.12.3
>
>
> The decrypt API in BlockCipher.Decryptor currently only provides an api that 
> takes in a byte array
> {code:java}
> byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code}
> A parquet reader that uses the DirectByteBufferAllocator has to incur the 
> cost of copying the data into a byte array (and sometimes back to a 
> DirectByteBuffer) to decrypt data.
> This proposes adding a new API that accepts ByteBuffer as input and avoids 
> the data copy.
> {code:java}
> ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code}
> The decryption in ColumnChunkPageReadStore can also be updated to use the 
> ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a 
> HeapByteBuffer, then we can continue to use the byte array API since that 
> does not incur a copy when the underlying byte array is accessed.
> Also, some investigation has shown that decryption with ByteBuffers is not 
> able to use hardware acceleration in JVM's before JDK17. In those cases, the 
> overall decryption speed is faster with byte arrays even after incurring the 
> overhead of making a copy. 
> The proposal, then, is to enable the use of the ByteBuffer api for 
> DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user 
> explicitly configures it. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-12-03 Thread GitBox


shangxinli commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1038821325


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -133,11 +135,36 @@ public DataPage readPage() {
 public DataPage visit(DataPageV1 dataPageV1) {
   try {
 BytesInput bytes = dataPageV1.getBytes();
-if (null != blockDecryptor) {
-  bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+BytesInput decompressed;
+
+if (options.getAllocator().isDirect() && 
options.useOffHeapDecryptBuffer()) {
+  ByteBuffer byteBuffer = bytes.toByteBuffer();
+  if (!byteBuffer.isDirect()) {
+throw new ParquetDecodingException("Expected a direct buffer");
+  }
+  if (blockDecryptor != null) {
+byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
+  }
+  long compressedSize = byteBuffer.limit();
+
+  ByteBuffer decompressedBuffer =
+  
options.getAllocator().allocate(dataPageV1.getUncompressedSize());
+  decompressor.decompress(byteBuffer, (int) compressedSize, 
decompressedBuffer,
+  dataPageV1.getUncompressedSize());
+
+  // HACKY: sometimes we need to do `flip` because the position of 
output bytebuffer is

Review Comment:
   Do we know in what scenario the output byte buffer is not set? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2212) Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642894#comment-17642894
 ] 

ASF GitHub Bot commented on PARQUET-2212:
-

shangxinli commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1038820534


##
parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java:
##
@@ -44,6 +44,8 @@ public class ParquetReadOptions {
   private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
   private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
   private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true;
+  // Default to true if JDK 17 or newer.

Review Comment:
   Don't quite understand this comment. Where it is set to true? 





> Add ByteBuffer api for decryptors to allow direct memory to be decrypted
> 
>
> Key: PARQUET-2212
> URL: https://issues.apache.org/jira/browse/PARQUET-2212
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.12.3
>Reporter: Parth Chandra
>Priority: Major
> Fix For: 1.12.3
>
>
> The decrypt API in BlockCipher.Decryptor currently only provides an api that 
> takes in a byte array
> {code:java}
> byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code}
> A parquet reader that uses the DirectByteBufferAllocator has to incur the 
> cost of copying the data into a byte array (and sometimes back to a 
> DirectByteBuffer) to decrypt data.
> This proposes adding a new API that accepts ByteBuffer as input and avoids 
> the data copy.
> {code:java}
> ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code}
> The decryption in ColumnChunkPageReadStore can also be updated to use the 
> ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a 
> HeapByteBuffer, then we can continue to use the byte array API since that 
> does not incur a copy when the underlying byte array is accessed.
> Also, some investigation has shown that decryption with ByteBuffers is not 
> able to use hardware acceleration in JVM's before JDK17. In those cases, the 
> overall decryption speed is faster with byte arrays even after incurring the 
> overhead of making a copy. 
> The proposal, then, is to enable the use of the ByteBuffer api for 
> DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user 
> explicitly configures it. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-12-03 Thread GitBox


shangxinli commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1038820534


##
parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java:
##
@@ -44,6 +44,8 @@ public class ParquetReadOptions {
   private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
   private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
   private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true;
+  // Default to true if JDK 17 or newer.

Review Comment:
   Don't quite understand this comment. Where it is set to true? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2198) Vulnerabilities in jackson-databind

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642892#comment-17642892
 ] 

ASF GitHub Bot commented on PARQUET-2198:
-

shangxinli merged PR #1005:
URL: https://github.com/apache/parquet-mr/pull/1005




> Vulnerabilities in jackson-databind
> ---
>
> Key: PARQUET-2198
> URL: https://issues.apache.org/jira/browse/PARQUET-2198
> Project: Parquet
>  Issue Type: Bug
>Affects Versions: 1.12.3
>Reporter: Łukasz Dziedziul
>Priority: Major
>  Labels: jackson-databind, security, vulnerabilities
>
> Update jackson-databind to mitigate CVEs:
>  * [CVE-2022-42003|https://github.com/advisories/GHSA-jjjh-jjxp-wpff] - 
> [https://nvd.nist.gov/vuln/detail/CVE-2022-42003]
>  * [CVE-2022-42004|https://github.com/advisories/GHSA-rgv9-q543-rqg4] - 
> [https://nvd.nist.gov/vuln/detail/CVE-2022-42004 (fixed in  
> 2.13.4)|https://nvd.nist.gov/vuln/detail/CVE-2022-42004]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli merged pull request #1005: PARQUET-2198 : Updating jackson data bind version to fix CVEs

2022-12-03 Thread GitBox


shangxinli merged PR #1005:
URL: https://github.com/apache/parquet-mr/pull/1005


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2184) Improve SnappyCompressor buffer expansion performance

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642891#comment-17642891
 ] 

ASF GitHub Bot commented on PARQUET-2184:
-

shangxinli commented on PR #993:
URL: https://github.com/apache/parquet-mr/pull/993#issuecomment-1336215536

   @abaranec can you resolve the conflict? 




> Improve SnappyCompressor buffer expansion performance
> -
>
> Key: PARQUET-2184
> URL: https://issues.apache.org/jira/browse/PARQUET-2184
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Andrew Baranec
>Priority: Minor
>
> The existing implementation of SnappyCompressor will only allocate enough 
> bytes for the buffer passed into setInput().  This leads to suboptimal 
> performance when there are patterns of writes that cause repeated buffer 
> expansions.  In the worst case it must copy the entire buffer for every 
> single invocation of setInput()
> Instead of allocating a buffer of size current + write length,  there should 
> be an expansion strategy that reduces the amount of copying required.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli commented on pull request #993: PARQUET-2184: Improve the allocation behavior of SnappyCompressor

2022-12-03 Thread GitBox


shangxinli commented on PR #993:
URL: https://github.com/apache/parquet-mr/pull/993#issuecomment-1336215536

   @abaranec can you resolve the conflict? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2177) Fix parquet-cli not to fail showing descriptions

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642889#comment-17642889
 ] 

ASF GitHub Bot commented on PARQUET-2177:
-

shangxinli merged PR #991:
URL: https://github.com/apache/parquet-mr/pull/991




> Fix parquet-cli not to fail showing descriptions
> 
>
> Key: PARQUET-2177
> URL: https://issues.apache.org/jira/browse/PARQUET-2177
> Project: Parquet
>  Issue Type: Bug
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Minor
>
> Currently, trying to show the descriptions of the 'prune' and 'masking' 
> subcommands leads to NPE as follows.
> {code}
> $ java -cp 'target/parquet-cli-1.13.0-SNAPSHOT.jar:target/dependency/*' 
> org.apache.parquet.cli.Main help prune
> Exception in thread "main" java.lang.NullPointerException
>   at 
> com.beust.jcommander.JCommander$MainParameter.access$900(JCommander.java:64)
>   at 
> com.beust.jcommander.JCommander.getMainParameterDescription(JCommander.java:965)
>   at org.apache.parquet.cli.Help.run(Help.java:65)
>   at org.apache.parquet.cli.Main.run(Main.java:146)
>   at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
>   at org.apache.parquet.cli.Main.main(Main.java:189)
> {code}
> {code}
> $ java -cp 'target/parquet-cli-1.13.0-SNAPSHOT.jar:target/dependency/*' 
> org.apache.parquet.cli.Main help masking
> Exception in thread "main" java.lang.NullPointerException
>   at 
> com.beust.jcommander.JCommander$MainParameter.access$900(JCommander.java:64)
>   at 
> com.beust.jcommander.JCommander.getMainParameterDescription(JCommander.java:965)
>   at org.apache.parquet.cli.Help.run(Help.java:65)
>   at org.apache.parquet.cli.Main.run(Main.java:146)
>   at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
>   at org.apache.parquet.cli.Main.main(Main.java:189)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli merged pull request #991: PARQUET-2177: Fix parquet-cli not to fail showing descriptions

2022-12-03 Thread GitBox


shangxinli merged PR #991:
URL: https://github.com/apache/parquet-mr/pull/991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-1711) [parquet-protobuf] stack overflow when work with well known json type

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642888#comment-17642888
 ] 

ASF GitHub Bot commented on PARQUET-1711:
-

shangxinli closed pull request #988: PARQUET-1711: Break circular dependencies 
in proto definitions
URL: https://github.com/apache/parquet-mr/pull/988




> [parquet-protobuf] stack overflow when work with well known json type
> -
>
> Key: PARQUET-1711
> URL: https://issues.apache.org/jira/browse/PARQUET-1711
> Project: Parquet
>  Issue Type: Bug
>Affects Versions: 1.10.1
>Reporter: Lawrence He
>Priority: Major
>
> Writing following protobuf message as parquet file is not possible: 
> {code:java}
> syntax = "proto3";
> import "google/protobuf/struct.proto";
> package test;
> option java_outer_classname = "CustomMessage";
> message TestMessage {
> map data = 1;
> } {code}
> Protobuf introduced "well known json type" such like 
> [ListValue|https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#listvalue]
>  to work around json schema conversion. 
> However writing above messages traps parquet writer into an infinite loop due 
> to the "general type" support in protobuf. Current implementation will keep 
> referencing 6 possible types defined in protobuf (null, bool, number, string, 
> struct, list) and entering infinite loop when referencing "struct".
> {code:java}
> java.lang.StackOverflowErrorjava.lang.StackOverflowError at 
> java.base/java.util.Arrays$ArrayItr.(Arrays.java:4418) at 
> java.base/java.util.Arrays$ArrayList.iterator(Arrays.java:4410) at 
> java.base/java.util.Collections$UnmodifiableCollection$1.(Collections.java:1044)
>  at 
> java.base/java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1043)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:64)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:66)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:66)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:66)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:66)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli closed pull request #988: PARQUET-1711: Break circular dependencies in proto definitions

2022-12-03 Thread GitBox


shangxinli closed pull request #988: PARQUET-1711: Break circular dependencies 
in proto definitions
URL: https://github.com/apache/parquet-mr/pull/988


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-1711) [parquet-protobuf] stack overflow when work with well known json type

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642887#comment-17642887
 ] 

ASF GitHub Bot commented on PARQUET-1711:
-

shangxinli commented on PR #988:
URL: https://github.com/apache/parquet-mr/pull/988#issuecomment-1336214908

   Since https://github.com/apache/parquet-mr/pull/995 is merged, let's close 
this one. Thanks @matthieun for the contribution !




> [parquet-protobuf] stack overflow when work with well known json type
> -
>
> Key: PARQUET-1711
> URL: https://issues.apache.org/jira/browse/PARQUET-1711
> Project: Parquet
>  Issue Type: Bug
>Affects Versions: 1.10.1
>Reporter: Lawrence He
>Priority: Major
>
> Writing following protobuf message as parquet file is not possible: 
> {code:java}
> syntax = "proto3";
> import "google/protobuf/struct.proto";
> package test;
> option java_outer_classname = "CustomMessage";
> message TestMessage {
> map data = 1;
> } {code}
> Protobuf introduced "well known json type" such like 
> [ListValue|https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#listvalue]
>  to work around json schema conversion. 
> However writing above messages traps parquet writer into an infinite loop due 
> to the "general type" support in protobuf. Current implementation will keep 
> referencing 6 possible types defined in protobuf (null, bool, number, string, 
> struct, list) and entering infinite loop when referencing "struct".
> {code:java}
> java.lang.StackOverflowErrorjava.lang.StackOverflowError at 
> java.base/java.util.Arrays$ArrayItr.(Arrays.java:4418) at 
> java.base/java.util.Arrays$ArrayList.iterator(Arrays.java:4410) at 
> java.base/java.util.Collections$UnmodifiableCollection$1.(Collections.java:1044)
>  at 
> java.base/java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1043)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:64)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:66)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:66)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:66)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.convertFields(ProtoSchemaConverter.java:66)
>  at 
> org.apache.parquet.proto.ProtoSchemaConverter.addField(ProtoSchemaConverter.java:96)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli commented on pull request #988: PARQUET-1711: Break circular dependencies in proto definitions

2022-12-03 Thread GitBox


shangxinli commented on PR #988:
URL: https://github.com/apache/parquet-mr/pull/988#issuecomment-1336214908

   Since https://github.com/apache/parquet-mr/pull/995 is merged, let's close 
this one. Thanks @matthieun for the contribution !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2173) Fix parquet build against hadoop 3.3.3+

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642886#comment-17642886
 ] 

ASF GitHub Bot commented on PARQUET-2173:
-

shangxinli commented on PR #985:
URL: https://github.com/apache/parquet-mr/pull/985#issuecomment-1336214427

   cc @ggershinsky @wgtmac let me know if you have concern to merge. 




> Fix parquet build against hadoop 3.3.3+
> ---
>
> Key: PARQUET-2173
> URL: https://issues.apache.org/jira/browse/PARQUET-2173
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-cli
>Affects Versions: 1.13.0
>Reporter: Steve Loughran
>Priority: Major
>
> parquet won't build against hadoop 3.3.3+ because it swapped out log4j 1.17 
> for reload4j, and this creates maven dependency problems in parquet cli
> {code}
> [INFO] --- maven-dependency-plugin:3.1.1:analyze-only (default) @ parquet-cli 
> ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]ch.qos.reload4j:reload4j:jar:1.2.22:provided
> {code}
> the hadoop common dependencies need to exclude this jar and any changed slf4j 
> ones.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli commented on pull request #985: PARQUET-2173. Fix parquet build against hadoop 3.3.3+

2022-12-03 Thread GitBox


shangxinli commented on PR #985:
URL: https://github.com/apache/parquet-mr/pull/985#issuecomment-1336214427

   cc @ggershinsky @wgtmac let me know if you have concern to merge. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2159) Parquet bit-packing de/encode optimization

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642884#comment-17642884
 ] 

ASF GitHub Bot commented on PARQUET-2159:
-

shangxinli commented on PR #1011:
URL: https://github.com/apache/parquet-mr/pull/1011#issuecomment-1336210632

   Thank @gszadovszky a lot for helping with this PR! 
   
   +1 for what @gszadovszky said. The mainstream runtime JDK is still 1.8. 
Parquet is one of the underlying building blocks for many big data 
applications.  The bare minimum, for now, is to keep java8 compatible. 
Otherwise forcing applications to upgrade to jdk17 because of Parquet is 
disruptive and impacts adoptions. 
   
   @jiangjiguang, I am very happy to see you have this PR to help the Parquet 
community.  Would you mind starting an email discussion to 
dev@parquet.apache.org for this topic? 




> Parquet bit-packing de/encode optimization
> --
>
> Key: PARQUET-2159
> URL: https://issues.apache.org/jira/browse/PARQUET-2159
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Fang-Xie
>Assignee: Fang-Xie
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: image-2022-06-15-22-56-08-396.png, 
> image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, 
> image-2022-06-15-22-58-40-704.png
>
>
> Current Spark use Parquet-mr as parquet reader/writer library, but the 
> built-in bit-packing en/decode is not efficient enough. 
> Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector 
> in Open JDK18 brings prominent performance improvement.
> Due to Vector API is added to OpenJDK since 16, So this optimization request 
> JDK16 or higher.
> *Below are our test results*
> Functional test is based on open-source parquet-mr Bit-pack decoding 
> function: *_public final void unpack8Values(final byte[] in, final int inPos, 
> final int[] out, final int outPos)_* __
> compared with our implementation with vector API *_public final void 
> unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final 
> int outPos)_*
> We tested 10 pairs (open source parquet bit unpacking vs ours optimized 
> vectorized SIMD implementation) decode function with bit 
> width=\{1,2,3,4,5,6,7,8,9,10}, below are test results:
> !image-2022-06-15-22-56-08-396.png|width=437,height=223!
> We integrated our bit-packing decode implementation into parquet-mr, tested 
> the parquet batch reader ability from Spark VectorizedParquetRecordReader 
> which get parquet column data by the batch way. We construct parquet file 
> with different row count and column count, the column data type is Int32, the 
> maximum int value is 127 which satisfies bit pack encode with bit width=7,   
> the count of the row is from 10k to 100 million and the count of the column 
> is from 1 to 4.
> !image-2022-06-15-22-57-15-964.png|width=453,height=229!
> !image-2022-06-15-22-58-01-442.png|width=439,height=217!
> !image-2022-06-15-22-58-40-704.png|width=415,height=208!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli commented on pull request #1011: PARQUET-2159: java17 vector parquet bit-packing decode optimization

2022-12-03 Thread GitBox


shangxinli commented on PR #1011:
URL: https://github.com/apache/parquet-mr/pull/1011#issuecomment-1336210632

   Thank @gszadovszky a lot for helping with this PR! 
   
   +1 for what @gszadovszky said. The mainstream runtime JDK is still 1.8. 
Parquet is one of the underlying building blocks for many big data 
applications.  The bare minimum, for now, is to keep java8 compatible. 
Otherwise forcing applications to upgrade to jdk17 because of Parquet is 
disruptive and impacts adoptions. 
   
   @jiangjiguang, I am very happy to see you have this PR to help the Parquet 
community.  Would you mind starting an email discussion to 
dev@parquet.apache.org for this topic? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642878#comment-17642878
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1336203668

   @kazuyukitanimura @steveloughran @kbendick @ggershinsky @wgtmac 
@theosib-amazon Do you still have comments? 




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] shangxinli commented on pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-12-03 Thread GitBox


shangxinli commented on PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#issuecomment-1336203668

   @kazuyukitanimura @steveloughran @kbendick @ggershinsky @wgtmac 
@theosib-amazon Do you still have comments? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642875#comment-17642875
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038808056


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.

Review Comment:
   I see the comment with 'hack'. What is the proper implementation? 





> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 

[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-12-03 Thread GitBox


shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038808056


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.

Review Comment:
   I see the comment with 'hack'. What is the proper implementation? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642873#comment-17642873
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038807754


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}
+if (readIndex < buffers.size()) {
+  long start = System.nanoTime();
+  try {
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): Getting next buffer");
+}
+Future future = readFutures.take();
+future.get();
+long timeSpent = System.nanoTime() - start;
+totalCountBlocked.add(1);
+totalTimeBlocked.add(timeSpent);
+maxTimeBlocked.accumulate(timeSpent);
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, 
timeSpent);
+}
+  } catch (Exception e) {
+if (e instanceof InterruptedException) {
+  

[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-12-03 Thread GitBox


shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038807754


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}
+if (readIndex < buffers.size()) {
+  long start = System.nanoTime();
+  try {
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): Getting next buffer");
+}
+Future future = readFutures.take();
+future.get();
+long timeSpent = System.nanoTime() - start;
+totalCountBlocked.add(1);
+totalTimeBlocked.add(timeSpent);
+maxTimeBlocked.accumulate(timeSpent);
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, 
timeSpent);
+}
+  } catch (Exception e) {
+if (e instanceof InterruptedException) {
+  Thread.currentThread().interrupt();
+}
+LOG.error("Async (next): exception while getting next buffer: ", e);
+throw new RuntimeException(e);
+  }
+  readIndex++;
+}
+return super.nextBuffer();
+  }
+
+  public void close() {
+   

[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-12-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642872#comment-17642872
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038807754


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}
+if (readIndex < buffers.size()) {
+  long start = System.nanoTime();
+  try {
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): Getting next buffer");
+}
+Future future = readFutures.take();
+future.get();
+long timeSpent = System.nanoTime() - start;
+totalCountBlocked.add(1);
+totalTimeBlocked.add(timeSpent);
+maxTimeBlocked.accumulate(timeSpent);
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, 
timeSpent);
+}
+  } catch (Exception e) {
+if (e instanceof InterruptedException) {
+  

[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-12-03 Thread GitBox


shangxinli commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1038807754


##
parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java:
##
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncMultiBufferInputStream extends MultiBufferInputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncMultiBufferInputStream.class);
+
+  private int fetchIndex = 0;
+  private final SeekableInputStream fileInputStream;
+  private int readIndex = 0;
+  private ExecutorService threadPool;
+  private LinkedBlockingQueue> readFutures;
+  private boolean closed = false;
+
+  private LongAdder totalTimeBlocked = new LongAdder();
+  private LongAdder totalCountBlocked = new LongAdder();
+  private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L);
+
+  AsyncMultiBufferInputStream(ExecutorService threadPool, SeekableInputStream 
fileInputStream,
+List buffers) {
+super(buffers);
+this.fileInputStream = fileInputStream;
+this.threadPool = threadPool;
+readFutures = new LinkedBlockingQueue<>(buffers.size());
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC: Begin read into buffers ");
+  for (ByteBuffer buf : buffers) {
+LOG.debug("ASYNC: buffer {} ", buf);
+  }
+}
+fetchAll();
+  }
+
+  private void checkState() {
+if (closed) {
+  throw new RuntimeException("Stream is closed");
+}
+  }
+
+  private void fetchAll() {
+checkState();
+submitReadTask(0);
+  }
+
+  private void submitReadTask(int bufferNo) {
+ByteBuffer buffer = buffers.get(bufferNo);
+try {
+  readFutures.put(threadPool.submit(() -> {
+  readOneBuffer(buffer);
+  if (bufferNo < buffers.size() - 1) {
+submitReadTask(bufferNo + 1);
+  }
+  return null;
+})
+  );
+} catch (InterruptedException e) {
+  Thread.currentThread().interrupt();
+  throw new RuntimeException(e);
+}
+  }
+
+  private void readOneBuffer(ByteBuffer buffer) {
+long startTime = System.nanoTime();
+try {
+  fileInputStream.readFully(buffer);
+  buffer.flip();
+  long readCompleted = System.nanoTime();
+  long timeSpent = readCompleted - startTime;
+  LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0);
+  fetchIndex++;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public boolean nextBuffer() {
+checkState();
+// hack: parent constructor can call this method before this class is 
fully initialized.
+// Just return without doing anything.
+if (readFutures == null) {
+  return false;
+}
+if (readIndex < buffers.size()) {
+  long start = System.nanoTime();
+  try {
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): Getting next buffer");
+}
+Future future = readFutures.take();
+future.get();
+long timeSpent = System.nanoTime() - start;
+totalCountBlocked.add(1);
+totalTimeBlocked.add(timeSpent);
+maxTimeBlocked.accumulate(timeSpent);
+if (LOG.isDebugEnabled()) {
+  LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, 
timeSpent);
+}
+  } catch (Exception e) {
+if (e instanceof InterruptedException) {
+  Thread.currentThread().interrupt();
+}
+LOG.error("Async (next): exception while getting next buffer: ", e);
+throw new RuntimeException(e);
+  }
+  readIndex++;
+}
+return super.nextBuffer();
+  }
+
+  public void close() {
+