[jira] [Commented] (PARQUET-2126) Thread safety bug in CodecFactory
[ https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631496#comment-17631496 ] ASF GitHub Bot commented on PARQUET-2126: - wgtmac commented on code in PR #959: URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java: ## @@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory { protected static final Map CODEC_BY_NAME = Collections .synchronizedMap(new HashMap()); - private final Map compressors = new HashMap(); - private final Map decompressors = new HashMap(); + /* + See: https://issues.apache.org/jira/browse/PARQUET-2126 + The old implementation stored a single global instance of each type of compressor and decompressor, which + broke thread safety. The solution here is to store one instance of each codec type per-thread. + Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs + ever created, so we have to implement the per-thread management explicitly. + */ + private final Map> allCompressors = new ConcurrentHashMap<>(); Review Comment: CMIIW, the parquet writer will always create a new CodecFactory internally, so getCompressor does not suffer from any thread-safety issue. ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java: ## @@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory { protected static final Map CODEC_BY_NAME = Collections .synchronizedMap(new HashMap()); - private final Map compressors = new HashMap(); - private final Map decompressors = new HashMap(); + /* + See: https://issues.apache.org/jira/browse/PARQUET-2126 + The old implementation stored a single global instance of each type of compressor and decompressor, which + broke thread safety. The solution here is to store one instance of each codec type per-thread. + Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs + ever created, so we have to implement the per-thread management explicitly. + */ + private final Map> allCompressors = new ConcurrentHashMap<>(); + private final Map> allDecompressors = new ConcurrentHashMap<>(); Review Comment: It looks like we cannot remove the cached entry from the map when thread exits. What is worse, the map will get explosion if there are tons of threads in a long running instance. The `getDecompressor` gets called only on the column chunk basis. So I expect the frequency of the call would not be very high. Would a single `ConcurrentHashMap()` be sufficient? If we really care about the regression of introducing ConcurrentHashMap when we are sure no concurrency will happen, we can add a new thread-unsafe CodecFactory implementation to do the job. Any thoughts? @theosib-amazon @shangxinli > Thread safety bug in CodecFactory > - > > Key: PARQUET-2126 > URL: https://issues.apache.org/jira/browse/PARQUET-2126 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.12.2 >Reporter: James Turton >Priority: Major > > The code for returning Compressor objects to the caller goes to some lengths > to achieve thread safety, including keeping Codec objects in an Apache > Commons pool that has thread-safe borrow semantics. This is all undone by > the BytesCompressor and BytesDecompressor Maps in > org.apache.parquet.hadoop.CodecFactory which end up caching single compressor > and decompressor instances due to code in CodecFactory@getCompressor and > CodecFactory@getDecompressor. When the caller runs multiple threads, those > threads end up sharing compressor and decompressor instances. > For compressors based on Xerial Snappy this bug has no effect because that > library is itself thread safe. But when BuiltInGzipCompressor from Hadoop is > selected for the CompressionCodecName.GZIP case, serious problems ensue. > That class is not thread safe and sharing one instance of it between threads > produces both silent data corruption and JVM crashes. > To fix this situation, parquet-mr should stop caching single compressor and > decompressor instances. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe
wgtmac commented on code in PR #959: URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java: ## @@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory { protected static final Map CODEC_BY_NAME = Collections .synchronizedMap(new HashMap()); - private final Map compressors = new HashMap(); - private final Map decompressors = new HashMap(); + /* + See: https://issues.apache.org/jira/browse/PARQUET-2126 + The old implementation stored a single global instance of each type of compressor and decompressor, which + broke thread safety. The solution here is to store one instance of each codec type per-thread. + Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs + ever created, so we have to implement the per-thread management explicitly. + */ + private final Map> allCompressors = new ConcurrentHashMap<>(); Review Comment: CMIIW, the parquet writer will always create a new CodecFactory internally, so getCompressor does not suffer from any thread-safety issue. ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java: ## @@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory { protected static final Map CODEC_BY_NAME = Collections .synchronizedMap(new HashMap()); - private final Map compressors = new HashMap(); - private final Map decompressors = new HashMap(); + /* + See: https://issues.apache.org/jira/browse/PARQUET-2126 + The old implementation stored a single global instance of each type of compressor and decompressor, which + broke thread safety. The solution here is to store one instance of each codec type per-thread. + Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs + ever created, so we have to implement the per-thread management explicitly. + */ + private final Map> allCompressors = new ConcurrentHashMap<>(); + private final Map> allDecompressors = new ConcurrentHashMap<>(); Review Comment: It looks like we cannot remove the cached entry from the map when thread exits. What is worse, the map will get explosion if there are tons of threads in a long running instance. The `getDecompressor` gets called only on the column chunk basis. So I expect the frequency of the call would not be very high. Would a single `ConcurrentHashMap()` be sufficient? If we really care about the regression of introducing ConcurrentHashMap when we are sure no concurrency will happen, we can add a new thread-unsafe CodecFactory implementation to do the job. Any thoughts? @theosib-amazon @shangxinli -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2212) Add ByteBuffer api for decryptors to allow direct memory to be decrypted
[ https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631256#comment-17631256 ] ASF GitHub Bot commented on PARQUET-2212: - parthchandra commented on PR #1008: URL: https://github.com/apache/parquet-mr/pull/1008#issuecomment-1309294080 @ggershinsky your review will be appreciated! > Add ByteBuffer api for decryptors to allow direct memory to be decrypted > > > Key: PARQUET-2212 > URL: https://issues.apache.org/jira/browse/PARQUET-2212 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Affects Versions: 1.12.3 >Reporter: Parth Chandra >Priority: Major > Fix For: 1.12.3 > > > The decrypt API in BlockCipher.Decryptor currently only provides an api that > takes in a byte array > {code:java} > byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code} > A parquet reader that uses the DirectByteBufferAllocator has to incur the > cost of copying the data into a byte array (and sometimes back to a > DirectByteBuffer) to decrypt data. > This proposes adding a new API that accepts ByteBuffer as input and avoids > the data copy. > {code:java} > ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code} > The decryption in ColumnChunkPageReadStore can also be updated to use the > ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a > HeapByteBuffer, then we can continue to use the byte array API since that > does not incur a copy when the underlying byte array is accessed. > Also, some investigation has shown that decryption with ByteBuffers is not > able to use hardware acceleration in JVM's before JDK17. In those cases, the > overall decryption speed is faster with byte arrays even after incurring the > overhead of making a copy. > The proposal, then, is to enable the use of the ByteBuffer api for > DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user > explicitly configures it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [parquet-mr] parthchandra commented on pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted
parthchandra commented on PR #1008: URL: https://github.com/apache/parquet-mr/pull/1008#issuecomment-1309294080 @ggershinsky your review will be appreciated! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2212) Add ByteBuffer api for decryptors to allow direct memory to be decrypted
[ https://issues.apache.org/jira/browse/PARQUET-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17631252#comment-17631252 ] ASF GitHub Bot commented on PARQUET-2212: - parthchandra opened a new pull request, #1008: URL: https://github.com/apache/parquet-mr/pull/1008 The PR adds the new ByteBuffer api and also updates ColumnChunkPageReadStore.readPage to use the new API. A few additional classes were touched (ParquetReader.Builder, BytesInput) to allow an allocator to be specified and/or to avoid ByteBuffer -> byte array copying. These changes were necessary to enable the unit test. A user option has been added to explicitly enable/disable the use of the ByteBuffer api for decryption. ### Jira - My PR addresses t [Parquet 2212](https://issues.apache.org/jira/browse/PARQUET-2212) ### Tests - Updates Unit test(s) in `org.apache.parquet.crypto.TestPropertiesDrivenEncryption` > Add ByteBuffer api for decryptors to allow direct memory to be decrypted > > > Key: PARQUET-2212 > URL: https://issues.apache.org/jira/browse/PARQUET-2212 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Affects Versions: 1.12.3 >Reporter: Parth Chandra >Priority: Major > Fix For: 1.12.3 > > > The decrypt API in BlockCipher.Decryptor currently only provides an api that > takes in a byte array > {code:java} > byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code} > A parquet reader that uses the DirectByteBufferAllocator has to incur the > cost of copying the data into a byte array (and sometimes back to a > DirectByteBuffer) to decrypt data. > This proposes adding a new API that accepts ByteBuffer as input and avoids > the data copy. > {code:java} > ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code} > The decryption in ColumnChunkPageReadStore can also be updated to use the > ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a > HeapByteBuffer, then we can continue to use the byte array API since that > does not incur a copy when the underlying byte array is accessed. > Also, some investigation has shown that decryption with ByteBuffers is not > able to use hardware acceleration in JVM's before JDK17. In those cases, the > overall decryption speed is faster with byte arrays even after incurring the > overhead of making a copy. > The proposal, then, is to enable the use of the ByteBuffer api for > DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user > explicitly configures it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [parquet-mr] parthchandra opened a new pull request, #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted
parthchandra opened a new pull request, #1008: URL: https://github.com/apache/parquet-mr/pull/1008 The PR adds the new ByteBuffer api and also updates ColumnChunkPageReadStore.readPage to use the new API. A few additional classes were touched (ParquetReader.Builder, BytesInput) to allow an allocator to be specified and/or to avoid ByteBuffer -> byte array copying. These changes were necessary to enable the unit test. A user option has been added to explicitly enable/disable the use of the ByteBuffer api for decryption. ### Jira - My PR addresses t [Parquet 2212](https://issues.apache.org/jira/browse/PARQUET-2212) ### Tests - Updates Unit test(s) in `org.apache.parquet.crypto.TestPropertiesDrivenEncryption` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (PARQUET-2212) Add ByteBuffer api for decryptors to allow direct memory to be decrypted
Parth Chandra created PARQUET-2212: -- Summary: Add ByteBuffer api for decryptors to allow direct memory to be decrypted Key: PARQUET-2212 URL: https://issues.apache.org/jira/browse/PARQUET-2212 Project: Parquet Issue Type: Improvement Components: parquet-mr Affects Versions: 1.12.3 Reporter: Parth Chandra Fix For: 1.12.3 The decrypt API in BlockCipher.Decryptor currently only provides an api that takes in a byte array {code:java} byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);{code} A parquet reader that uses the DirectByteBufferAllocator has to incur the cost of copying the data into a byte array (and sometimes back to a DirectByteBuffer) to decrypt data. This proposes adding a new API that accepts ByteBuffer as input and avoids the data copy. {code:java} ByteBuffer decrypt(ByteBuffer from, byte[] AAD);{code} The decryption in ColumnChunkPageReadStore can also be updated to use the ByteBuffer based api if the buffer is a DirectByteBuffer. If the buffer is a HeapByteBuffer, then we can continue to use the byte array API since that does not incur a copy when the underlying byte array is accessed. Also, some investigation has shown that decryption with ByteBuffers is not able to use hardware acceleration in JVM's before JDK17. In those cases, the overall decryption speed is faster with byte arrays even after incurring the overhead of making a copy. The proposal, then, is to enable the use of the ByteBuffer api for DirectByteBuffers only, and only if the JDK is JDK17 or higher or the user explicitly configures it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (PARQUET-2210) Skip pages based on header metadata using a callback
[ https://issues.apache.org/jira/browse/PARQUET-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antoine Pitrou updated PARQUET-2210: Component/s: parquet-cpp > Skip pages based on header metadata using a callback > > > Key: PARQUET-2210 > URL: https://issues.apache.org/jira/browse/PARQUET-2210 > Project: Parquet > Issue Type: New Feature > Components: parquet-cpp >Reporter: fatemah >Priority: Major > Labels: pull-request-available > Time Spent: 1h 20m > Remaining Estimate: 0h > > Currently, we do not expose the page header metadata and they cannot be used > for skipping pages. I propose exposing the metadata through a callback that > would allow the caller to decide if they want to read or skip the page based > on the metadata. The signature of the callback would be the following: > std::function skip_page_callback) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (PARQUET-2210) [C++] Skip pages based on header metadata using a callback
[ https://issues.apache.org/jira/browse/PARQUET-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antoine Pitrou updated PARQUET-2210: Summary: [C++] Skip pages based on header metadata using a callback (was: Skip pages based on header metadata using a callback) > [C++] Skip pages based on header metadata using a callback > -- > > Key: PARQUET-2210 > URL: https://issues.apache.org/jira/browse/PARQUET-2210 > Project: Parquet > Issue Type: New Feature > Components: parquet-cpp >Reporter: fatemah >Priority: Major > Labels: pull-request-available > Time Spent: 1h 20m > Remaining Estimate: 0h > > Currently, we do not expose the page header metadata and they cannot be used > for skipping pages. I propose exposing the metadata through a callback that > would allow the caller to decide if they want to read or skip the page based > on the metadata. The signature of the callback would be the following: > std::function skip_page_callback) -- This message was sent by Atlassian Jira (v8.20.10#820010)