[jira] [Commented] (PARQUET-2126) Thread safety bug in CodecFactory

2022-11-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631496#comment-17631496
 ] 

ASF GitHub Bot commented on PARQUET-2126:
-

wgtmac commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map CODEC_BY_NAME = 
Collections
   .synchronizedMap(new HashMap());
 
-  private final Map compressors = new 
HashMap();
-  private final Map decompressors = 
new HashMap();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map> 
allCompressors = new ConcurrentHashMap<>();

Review Comment:
   CMIIW, the parquet writer will always create a new CodecFactory internally, 
so getCompressor does not suffer from any thread-safety issue.



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map CODEC_BY_NAME = 
Collections
   .synchronizedMap(new HashMap());
 
-  private final Map compressors = new 
HashMap();
-  private final Map decompressors = 
new HashMap();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map> 
allCompressors = new ConcurrentHashMap<>();
+  private final Map> 
allDecompressors = new ConcurrentHashMap<>();

Review Comment:
   It looks like we cannot remove the cached entry from the map when thread 
exits. What is worse, the map will get explosion if there are tons of threads 
in a long running instance. The `getDecompressor` gets called only on the 
column chunk basis. So I expect the frequency of the call would not be very 
high. Would a single `ConcurrentHashMap()` be sufficient? If we really care about the regression of 
introducing ConcurrentHashMap when we are sure no concurrency will happen, we 
can add a new thread-unsafe CodecFactory implementation to do the job. Any 
thoughts? @theosib-amazon @shangxinli 





> Thread safety bug in CodecFactory
> -
>
> Key: PARQUET-2126
> URL: https://issues.apache.org/jira/browse/PARQUET-2126
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.12.2
>Reporter: James Turton
>Priority: Major
>
> The code for returning Compressor objects to the caller goes to some lengths 
> to achieve thread safety, including keeping Codec objects in an Apache 
> Commons pool that has thread-safe borrow semantics.  This is all undone by 
> the BytesCompressor and BytesDecompressor Maps in 
> org.apache.parquet.hadoop.CodecFactory which end up caching single compressor 
> and decompressor instances due to code in CodecFactory@getCompressor and 
> CodecFactory@getDecompressor.  When the caller runs multiple threads, those 
> threads end up sharing compressor and decompressor instances.
> For compressors based on Xerial Snappy this bug has no effect because that 
> library is itself thread safe.  But when BuiltInGzipCompressor from Hadoop is 
> selected for the CompressionCodecName.GZIP case, serious problems ensue.  
> That class is not thread safe and sharing one instance of it between threads 
> produces both silent data corruption and JVM crashes.
> To fix this situation, parquet-mr should stop caching single compressor and 
> decompressor instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

2022-11-09 Thread GitBox


wgtmac commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map CODEC_BY_NAME = 
Collections
   .synchronizedMap(new HashMap());
 
-  private final Map compressors = new 
HashMap();
-  private final Map decompressors = 
new HashMap();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map> 
allCompressors = new ConcurrentHashMap<>();

Review Comment:
   CMIIW, the parquet writer will always create a new CodecFactory internally, 
so getCompressor does not suffer from any thread-safety issue.



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map CODEC_BY_NAME = 
Collections
   .synchronizedMap(new HashMap());
 
-  private final Map compressors = new 
HashMap();
-  private final Map decompressors = 
new HashMap();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map> 
allCompressors = new ConcurrentHashMap<>();
+  private final Map> 
allDecompressors = new ConcurrentHashMap<>();

Review Comment:
   It looks like we cannot remove the cached entry from the map when thread 
exits. What is worse, the map will get explosion if there are tons of threads 
in a long running instance. The `getDecompressor` gets called only on the 
column chunk basis. So I expect the frequency of the call would not be very 
high. Would a single `ConcurrentHashMap()` be sufficient? If we really care about the regression of 
introducing ConcurrentHashMap when we are sure no concurrency will happen, we 
can add a new thread-unsafe CodecFactory implementation to do the job. Any 
thoughts? @theosib-amazon @shangxinli 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2212) Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-11-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631256#comment-17631256
 ] 

ASF GitHub Bot commented on PARQUET-2212:
-

parthchandra commented on PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#issuecomment-1309294080

   @ggershinsky your review will be appreciated! 
   




> Add ByteBuffer api for decryptors to allow direct memory to be decrypted
> 
>
> Key: PARQUET-2212
> URL: https://issues.apache.org/jira/browse/PARQUET-2212
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.12.3
>Reporter: Parth Chandra
>Priority: Major
> Fix For: 1.12.3
>
>
> The decrypt API in BlockCipher.Decryptor currently only provides an api that 
> takes in a byte array
> {code:java}
> byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code}
> A parquet reader that uses the DirectByteBufferAllocator has to incur the 
> cost of copying the data into a byte array (and sometimes back to a 
> DirectByteBuffer) to decrypt data.
> This proposes adding a new API that accepts ByteBuffer as input and avoids 
> the data copy.
> {code:java}
> ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code}
> The decryption in ColumnChunkPageReadStore can also be updated to use the 
> ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a 
> HeapByteBuffer, then we can continue to use the byte array API since that 
> does not incur a copy when the underlying byte array is accessed.
> Also, some investigation has shown that decryption with ByteBuffers is not 
> able to use hardware acceleration in JVM's before JDK17. In those cases, the 
> overall decryption speed is faster with byte arrays even after incurring the 
> overhead of making a copy. 
> The proposal, then, is to enable the use of the ByteBuffer api for 
> DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user 
> explicitly configures it. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] parthchandra commented on pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-11-09 Thread GitBox


parthchandra commented on PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#issuecomment-1309294080

   @ggershinsky your review will be appreciated! 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2212) Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-11-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631252#comment-17631252
 ] 

ASF GitHub Bot commented on PARQUET-2212:
-

parthchandra opened a new pull request, #1008:
URL: https://github.com/apache/parquet-mr/pull/1008

   The PR adds the new ByteBuffer api and also updates 
ColumnChunkPageReadStore.readPage to use the new API.
   A few additional classes were touched (ParquetReader.Builder, BytesInput) to 
allow an allocator to be specified and/or to avoid ByteBuffer -> byte array 
copying. These changes were necessary to enable the unit test.
   A user option has been added to explicitly enable/disable the use of the 
ByteBuffer api for decryption.
   ### Jira
   
   -  My PR addresses t [Parquet 
2212](https://issues.apache.org/jira/browse/PARQUET-2212) 
   
   ### Tests
   
   - Updates Unit test(s) in 
`org.apache.parquet.crypto.TestPropertiesDrivenEncryption`
   




> Add ByteBuffer api for decryptors to allow direct memory to be decrypted
> 
>
> Key: PARQUET-2212
> URL: https://issues.apache.org/jira/browse/PARQUET-2212
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.12.3
>Reporter: Parth Chandra
>Priority: Major
> Fix For: 1.12.3
>
>
> The decrypt API in BlockCipher.Decryptor currently only provides an api that 
> takes in a byte array
> {code:java}
> byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code}
> A parquet reader that uses the DirectByteBufferAllocator has to incur the 
> cost of copying the data into a byte array (and sometimes back to a 
> DirectByteBuffer) to decrypt data.
> This proposes adding a new API that accepts ByteBuffer as input and avoids 
> the data copy.
> {code:java}
> ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code}
> The decryption in ColumnChunkPageReadStore can also be updated to use the 
> ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a 
> HeapByteBuffer, then we can continue to use the byte array API since that 
> does not incur a copy when the underlying byte array is accessed.
> Also, some investigation has shown that decryption with ByteBuffers is not 
> able to use hardware acceleration in JVM's before JDK17. In those cases, the 
> overall decryption speed is faster with byte arrays even after incurring the 
> overhead of making a copy. 
> The proposal, then, is to enable the use of the ByteBuffer api for 
> DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user 
> explicitly configures it. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] parthchandra opened a new pull request, #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-11-09 Thread GitBox


parthchandra opened a new pull request, #1008:
URL: https://github.com/apache/parquet-mr/pull/1008

   The PR adds the new ByteBuffer api and also updates 
ColumnChunkPageReadStore.readPage to use the new API.
   A few additional classes were touched (ParquetReader.Builder, BytesInput) to 
allow an allocator to be specified and/or to avoid ByteBuffer -> byte array 
copying. These changes were necessary to enable the unit test.
   A user option has been added to explicitly enable/disable the use of the 
ByteBuffer api for decryption.
   ### Jira
   
   -  My PR addresses t [Parquet 
2212](https://issues.apache.org/jira/browse/PARQUET-2212) 
   
   ### Tests
   
   - Updates Unit test(s) in 
`org.apache.parquet.crypto.TestPropertiesDrivenEncryption`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (PARQUET-2212) Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-11-09 Thread Parth Chandra (Jira)
Parth Chandra created PARQUET-2212:
--

 Summary: Add ByteBuffer api for decryptors to allow direct memory 
to be decrypted
 Key: PARQUET-2212
 URL: https://issues.apache.org/jira/browse/PARQUET-2212
 Project: Parquet
  Issue Type: Improvement
  Components: parquet-mr
Affects Versions: 1.12.3
Reporter: Parth Chandra
 Fix For: 1.12.3


The decrypt API in BlockCipher.Decryptor currently only provides an api that 
takes in a byte array
{code:java}
byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code}
A parquet reader that uses the DirectByteBufferAllocator has to incur the cost 
of copying the data into a byte array (and sometimes back to a 
DirectByteBuffer) to decrypt data.

This proposes adding a new API that accepts ByteBuffer as input and avoids the 
data copy.
{code:java}
ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code}

The decryption in ColumnChunkPageReadStore can also be updated to use the 
ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a 
HeapByteBuffer, then we can continue to use the byte array API since that does 
not incur a copy when the underlying byte array is accessed.

Also, some investigation has shown that decryption with ByteBuffers is not able 
to use hardware acceleration in JVM's before JDK17. In those cases, the overall 
decryption speed is faster with byte arrays even after incurring the overhead 
of making a copy. 

The proposal, then, is to enable the use of the ByteBuffer api for 
DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user 
explicitly configures it. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (PARQUET-2210) Skip pages based on header metadata using a callback

2022-11-09 Thread Antoine Pitrou (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antoine Pitrou updated PARQUET-2210:

Component/s: parquet-cpp

> Skip pages based on header metadata using a callback
> 
>
> Key: PARQUET-2210
> URL: https://issues.apache.org/jira/browse/PARQUET-2210
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-cpp
>Reporter: fatemah
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently, we do not expose the page header metadata and they cannot be used 
> for skipping pages. I propose exposing the metadata through a callback that 
> would allow the caller to decide if they want to read or skip the page based 
> on the metadata. The signature of the callback would be the following: 
> std::function skip_page_callback)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (PARQUET-2210) [C++] Skip pages based on header metadata using a callback

2022-11-09 Thread Antoine Pitrou (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antoine Pitrou updated PARQUET-2210:

Summary: [C++] Skip pages based on header metadata using a callback  (was: 
Skip pages based on header metadata using a callback)

> [C++] Skip pages based on header metadata using a callback
> --
>
> Key: PARQUET-2210
> URL: https://issues.apache.org/jira/browse/PARQUET-2210
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-cpp
>Reporter: fatemah
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently, we do not expose the page header metadata and they cannot be used 
> for skipping pages. I propose exposing the metadata through a callback that 
> would allow the caller to decide if they want to read or skip the page based 
> on the metadata. The signature of the callback would be the following: 
> std::function skip_page_callback)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)