[jira] [Commented] (PARQUET-2126) Thread safety bug in CodecFactory

2022-05-16 Thread Parth Chandra (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537823#comment-17537823
 ] 

Parth Chandra commented on PARQUET-2126:


FWIW, I just submitted a PR to implement async io for the parquet file reader 
and in my testing with gzip, did not hit this corruption issue. Doesn't mean 
the issue isn't there and I'm sure my PR will increase the likelihood of 
hitting this. This fix is highly appreciated.

> Thread safety bug in CodecFactory
> -
>
> Key: PARQUET-2126
> URL: https://issues.apache.org/jira/browse/PARQUET-2126
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.12.2
>Reporter: James Turton
>Priority: Major
>
> The code for returning Compressor objects to the caller goes to some lengths 
> to achieve thread safety, including keeping Codec objects in an Apache 
> Commons pool that has thread-safe borrow semantics.  This is all undone by 
> the BytesCompressor and BytesDecompressor Maps in 
> org.apache.parquet.hadoop.CodecFactory which end up caching single compressor 
> and decompressor instances due to code in CodecFactory@getCompressor and 
> CodecFactory@getDecompressor.  When the caller runs multiple threads, those 
> threads end up sharing compressor and decompressor instances.
> For compressors based on Xerial Snappy this bug has no effect because that 
> library is itself thread safe.  But when BuiltInGzipCompressor from Hadoop is 
> selected for the CompressionCodecName.GZIP case, serious problems ensue.  
> That class is not thread safe and sharing one instance of it between threads 
> produces both silent data corruption and JVM crashes.
> To fix this situation, parquet-mr should stop caching single compressor and 
> decompressor instances.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537813#comment-17537813
 ] 

ASF GitHub Bot commented on PARQUET-2149:
-

parthchandra opened a new pull request, #968:
URL: https://github.com/apache/parquet-mr/pull/968

   
   
   ### Jira
   
   This PR addresses the following 
[PARQUET-2149](https://issues.apache.org/jira/browse/PARQUET-2149): Implement 
async IO for Parquet file reader
   
   ### Tests
   
   This PR adds the following unit tests 
  AsyncMultiBufferInputStream.*
  TestMultipleWriteRead.testReadWriteAsync 
  TestColumnChunkPageWriteStore.testAsync
  
   The PR is also tested by changing the default configuration to make all 
reads async and then ensuring all unit tests pass
   
   




> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] parthchandra opened a new pull request, #968: PARQUET-2149: Async IO implementation for ParquetFileReader

2022-05-16 Thread GitBox


parthchandra opened a new pull request, #968:
URL: https://github.com/apache/parquet-mr/pull/968

   
   
   ### Jira
   
   This PR addresses the following 
[PARQUET-2149](https://issues.apache.org/jira/browse/PARQUET-2149): Implement 
async IO for Parquet file reader
   
   ### Tests
   
   This PR adds the following unit tests 
  AsyncMultiBufferInputStream.*
  TestMultipleWriteRead.testReadWriteAsync 
  TestColumnChunkPageWriteStore.testAsync
  
   The PR is also tested by changing the default configuration to make all 
reads async and then ensuring all unit tests pass
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] theosib-amazon commented on pull request #962: Performance optimization to ByteBitPackingValuesReader

2022-05-16 Thread GitBox


theosib-amazon commented on PR #962:
URL: https://github.com/apache/parquet-mr/pull/962#issuecomment-1128059990

   There is no new functionality here. There is just a performance 
optimization. It looks like the following tests should already handle this: 
BitPackingPerfTest, TestBitPackingColumn. There are also a bunch of other tests 
that use it indirectly, because they test classes that instantiate 
ByteBitPackingValuesReader.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-16 Thread Parth Chandra (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Chandra updated PARQUET-2149:
---
Description: 
ParquetFileReader's implementation has the following flow (simplified) - 
      - For every column -> Read from storage in 8MB blocks -> Read all 
uncompressed pages into output queue 
      - From output queues -> (downstream ) decompression + decoding

This flow is serialized, which means that downstream threads are blocked until 
the data has been read. Because a large part of the time spent is waiting for 
data from storage, threads are idle and CPU utilization is really low.

There is no reason why this cannot be made asynchronous _and_ parallel. So 

For Column _i_ -> reading one chunk until end, from storage -> intermediate 
output queue -> read one uncompressed page until end -> output queue -> 
(downstream ) decompression + decoding

Note that this can be made completely self contained in ParquetFileReader and 
downstream implementations like Iceberg and Spark will automatically be able to 
take advantage without code change as long as the ParquetFileReader apis are 
not changed. 

In past work with async io  [Drill - async page reader 
|https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
 , I have seen 2x-3x improvement in reading speed for Parquet files.

  was:
ParquetFileReader's implementation has the following flow (simplified) - 
      - For every column -> Read from storage in 8MB blocks -> Read all 
uncompressed pages into output queue 
      - From output queues -> (downstream ) decompression + decoding

This flow is serialized, which means that downstream threads are blocked until 
the data has been read. Because a large part of the time spent is waiting for 
data from storage, threads are idle and CPU utilization is really low.

There is no reason why this cannot be made asynchronous _and_ parallel. So 

For Column _i_ -> reading one chunk until end, from storage -> intermediate 
output queue -> read one uncompressed page until end -> output queue -> 
(downstream ) decompression + decoding

Note that this can be made completely self contained in ParquetFileReader and 
downstream implementations like Iceberg and Spark will automatically be able to 
take advantage without code change as long as the ParquetFileReader apis are 
not changed. 

In past work with async io [ Drill - async page reader 
|https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
 , I have seen 2x-3x improvement in reading speed for Parquet files.


> Implement async IO for Parquet file reader
> --
>
> Key: PARQUET-2149
> URL: https://issues.apache.org/jira/browse/PARQUET-2149
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Parth Chandra
>Priority: Major
>
> ParquetFileReader's implementation has the following flow (simplified) - 
>       - For every column -> Read from storage in 8MB blocks -> Read all 
> uncompressed pages into output queue 
>       - From output queues -> (downstream ) decompression + decoding
> This flow is serialized, which means that downstream threads are blocked 
> until the data has been read. Because a large part of the time spent is 
> waiting for data from storage, threads are idle and CPU utilization is really 
> low.
> There is no reason why this cannot be made asynchronous _and_ parallel. So 
> For Column _i_ -> reading one chunk until end, from storage -> intermediate 
> output queue -> read one uncompressed page until end -> output queue -> 
> (downstream ) decompression + decoding
> Note that this can be made completely self contained in ParquetFileReader and 
> downstream implementations like Iceberg and Spark will automatically be able 
> to take advantage without code change as long as the ParquetFileReader apis 
> are not changed. 
> In past work with async io  [Drill - async page reader 
> |https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
>  , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (PARQUET-2149) Implement async IO for Parquet file reader

2022-05-16 Thread Parth Chandra (Jira)
Parth Chandra created PARQUET-2149:
--

 Summary: Implement async IO for Parquet file reader
 Key: PARQUET-2149
 URL: https://issues.apache.org/jira/browse/PARQUET-2149
 Project: Parquet
  Issue Type: Improvement
  Components: parquet-mr
Reporter: Parth Chandra


ParquetFileReader's implementation has the following flow (simplified) - 
      - For every column -> Read from storage in 8MB blocks -> Read all 
uncompressed pages into output queue 
      - From output queues -> (downstream ) decompression + decoding

This flow is serialized, which means that downstream threads are blocked until 
the data has been read. Because a large part of the time spent is waiting for 
data from storage, threads are idle and CPU utilization is really low.

There is no reason why this cannot be made asynchronous _and_ parallel. So 

For Column _i_ -> reading one chunk until end, from storage -> intermediate 
output queue -> read one uncompressed page until end -> output queue -> 
(downstream ) decompression + decoding

Note that this can be made completely self contained in ParquetFileReader and 
downstream implementations like Iceberg and Spark will automatically be able to 
take advantage without code change as long as the ParquetFileReader apis are 
not changed. 

In past work with async io [ Drill - async page reader 
|https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java]
 , I have seen 2x-3x improvement in reading speed for Parquet files.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2126) Thread safety bug in CodecFactory

2022-05-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537636#comment-17537636
 ] 

ASF GitHub Bot commented on PARQUET-2126:
-

theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127885617

   > My question is when a thread exits, we don't have a corresponding evict 
operation on the map. Using thread pool might be OK if the thread object is not 
changed, but not sure if there is a scenario where threads are created/exited 
quickly and we leak in that case.
   
   No matter what thread release() is called from, it will clean up all 
(de)compressors from all threads. I designed it specifically this way so that a 
leak won't happen. As long as close/release is called when it should be.
   
   Note that it's not appropriate to call close or release while 
(de)compression is still going on. If someone does that, it might still work, 
but it would be a protocol violation. The usage pattern should be:
   
   - Create Codec factory
   - Create worker threads
   - Threads create codecs
   - Threads finish using codecs
   - Threads terminate
   - The thread that created the worker threads waits until those threads are 
done
   - close/release is called.
   
   Someone might do something different, but that would be a bug no different 
from someone closing a file in one thread while it's being written to in 
another.




> Thread safety bug in CodecFactory
> -
>
> Key: PARQUET-2126
> URL: https://issues.apache.org/jira/browse/PARQUET-2126
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.12.2
>Reporter: James Turton
>Priority: Major
>
> The code for returning Compressor objects to the caller goes to some lengths 
> to achieve thread safety, including keeping Codec objects in an Apache 
> Commons pool that has thread-safe borrow semantics.  This is all undone by 
> the BytesCompressor and BytesDecompressor Maps in 
> org.apache.parquet.hadoop.CodecFactory which end up caching single compressor 
> and decompressor instances due to code in CodecFactory@getCompressor and 
> CodecFactory@getDecompressor.  When the caller runs multiple threads, those 
> threads end up sharing compressor and decompressor instances.
> For compressors based on Xerial Snappy this bug has no effect because that 
> library is itself thread safe.  But when BuiltInGzipCompressor from Hadoop is 
> selected for the CompressionCodecName.GZIP case, serious problems ensue.  
> That class is not thread safe and sharing one instance of it between threads 
> produces both silent data corruption and JVM crashes.
> To fix this situation, parquet-mr should stop caching single compressor and 
> decompressor instances.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

2022-05-16 Thread GitBox


theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127885617

   > My question is when a thread exits, we don't have a corresponding evict 
operation on the map. Using thread pool might be OK if the thread object is not 
changed, but not sure if there is a scenario where threads are created/exited 
quickly and we leak in that case.
   
   No matter what thread release() is called from, it will clean up all 
(de)compressors from all threads. I designed it specifically this way so that a 
leak won't happen. As long as close/release is called when it should be.
   
   Note that it's not appropriate to call close or release while 
(de)compression is still going on. If someone does that, it might still work, 
but it would be a protocol violation. The usage pattern should be:
   
   - Create Codec factory
   - Create worker threads
   - Threads create codecs
   - Threads finish using codecs
   - Threads terminate
   - The thread that created the worker threads waits until those threads are 
done
   - close/release is called.
   
   Someone might do something different, but that would be a bug no different 
from someone closing a file in one thread while it's being written to in 
another.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2126) Thread safety bug in CodecFactory

2022-05-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537620#comment-17537620
 ] 

ASF GitHub Bot commented on PARQUET-2126:
-

shangxinli commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127847443

My question is when a thread exits, we don't have a corresponding evict 
operation on the map. Using thread pool might be OK if the thread object is not 
changed, but not sure if there is a scenario where threads are created/exited 
quickly and we leak in that case. 




> Thread safety bug in CodecFactory
> -
>
> Key: PARQUET-2126
> URL: https://issues.apache.org/jira/browse/PARQUET-2126
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.12.2
>Reporter: James Turton
>Priority: Major
>
> The code for returning Compressor objects to the caller goes to some lengths 
> to achieve thread safety, including keeping Codec objects in an Apache 
> Commons pool that has thread-safe borrow semantics.  This is all undone by 
> the BytesCompressor and BytesDecompressor Maps in 
> org.apache.parquet.hadoop.CodecFactory which end up caching single compressor 
> and decompressor instances due to code in CodecFactory@getCompressor and 
> CodecFactory@getDecompressor.  When the caller runs multiple threads, those 
> threads end up sharing compressor and decompressor instances.
> For compressors based on Xerial Snappy this bug has no effect because that 
> library is itself thread safe.  But when BuiltInGzipCompressor from Hadoop is 
> selected for the CompressionCodecName.GZIP case, serious problems ensue.  
> That class is not thread safe and sharing one instance of it between threads 
> produces both silent data corruption and JVM crashes.
> To fix this situation, parquet-mr should stop caching single compressor and 
> decompressor instances.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] shangxinli commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

2022-05-16 Thread GitBox


shangxinli commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127847443

My question is when a thread exits, we don't have a corresponding evict 
operation on the map. Using thread pool might be OK if the thread object is not 
changed, but not sure if there is a scenario where threads are created/exited 
quickly and we leak in that case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2126) Thread safety bug in CodecFactory

2022-05-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537614#comment-17537614
 ] 

ASF GitHub Bot commented on PARQUET-2126:
-

theosib-amazon commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r873888258


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map CODEC_BY_NAME = 
Collections
   .synchronizedMap(new HashMap());
 
-  private final Map compressors = new 
HashMap();
-  private final Map decompressors = 
new HashMap();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map> 
all_compressors = new ConcurrentHashMap<>();

Review Comment:
   I changed it to camel case.





> Thread safety bug in CodecFactory
> -
>
> Key: PARQUET-2126
> URL: https://issues.apache.org/jira/browse/PARQUET-2126
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.12.2
>Reporter: James Turton
>Priority: Major
>
> The code for returning Compressor objects to the caller goes to some lengths 
> to achieve thread safety, including keeping Codec objects in an Apache 
> Commons pool that has thread-safe borrow semantics.  This is all undone by 
> the BytesCompressor and BytesDecompressor Maps in 
> org.apache.parquet.hadoop.CodecFactory which end up caching single compressor 
> and decompressor instances due to code in CodecFactory@getCompressor and 
> CodecFactory@getDecompressor.  When the caller runs multiple threads, those 
> threads end up sharing compressor and decompressor instances.
> For compressors based on Xerial Snappy this bug has no effect because that 
> library is itself thread safe.  But when BuiltInGzipCompressor from Hadoop is 
> selected for the CompressionCodecName.GZIP case, serious problems ensue.  
> That class is not thread safe and sharing one instance of it between threads 
> produces both silent data corruption and JVM crashes.
> To fix this situation, parquet-mr should stop caching single compressor and 
> decompressor instances.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] theosib-amazon commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

2022-05-16 Thread GitBox


theosib-amazon commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r873888258


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map CODEC_BY_NAME = 
Collections
   .synchronizedMap(new HashMap());
 
-  private final Map compressors = new 
HashMap();
-  private final Map decompressors = 
new HashMap();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map> 
all_compressors = new ConcurrentHashMap<>();

Review Comment:
   I changed it to camel case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2126) Thread safety bug in CodecFactory

2022-05-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537613#comment-17537613
 ] 

ASF GitHub Bot commented on PARQUET-2126:
-

theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127839048

   > If we change it to be per thread, then would it be a problem in the 
scenario where short living threads come and go? When the thread stopped, we 
might not know and leak here.
   > 
   > And, please add tests
   
   This is why I use the concurrent hash map, indexed by the thread. Short 
lived threads are not a problem in that case.
   
   I can't think of how I would go about testing this. Do you have any ideas? 
I'll have a look to see if there exist any tests already and see if I can add 
something.




> Thread safety bug in CodecFactory
> -
>
> Key: PARQUET-2126
> URL: https://issues.apache.org/jira/browse/PARQUET-2126
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.12.2
>Reporter: James Turton
>Priority: Major
>
> The code for returning Compressor objects to the caller goes to some lengths 
> to achieve thread safety, including keeping Codec objects in an Apache 
> Commons pool that has thread-safe borrow semantics.  This is all undone by 
> the BytesCompressor and BytesDecompressor Maps in 
> org.apache.parquet.hadoop.CodecFactory which end up caching single compressor 
> and decompressor instances due to code in CodecFactory@getCompressor and 
> CodecFactory@getDecompressor.  When the caller runs multiple threads, those 
> threads end up sharing compressor and decompressor instances.
> For compressors based on Xerial Snappy this bug has no effect because that 
> library is itself thread safe.  But when BuiltInGzipCompressor from Hadoop is 
> selected for the CompressionCodecName.GZIP case, serious problems ensue.  
> That class is not thread safe and sharing one instance of it between threads 
> produces both silent data corruption and JVM crashes.
> To fix this situation, parquet-mr should stop caching single compressor and 
> decompressor instances.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

2022-05-16 Thread GitBox


theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127839048

   > If we change it to be per thread, then would it be a problem in the 
scenario where short living threads come and go? When the thread stopped, we 
might not know and leak here.
   > 
   > And, please add tests
   
   This is why I use the concurrent hash map, indexed by the thread. Short 
lived threads are not a problem in that case.
   
   I can't think of how I would go about testing this. Do you have any ideas? 
I'll have a look to see if there exist any tests already and see if I can add 
something.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2126) Thread safety bug in CodecFactory

2022-05-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537611#comment-17537611
 ] 

ASF GitHub Bot commented on PARQUET-2126:
-

theosib-amazon commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r873884939


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##
@@ -184,8 +192,18 @@ public CompressionCodecName getCodecName() {
 
   }
 
+  /*
+  Modified for https://issues.apache.org/jira/browse/PARQUET-2126
+   */
   @Override
   public BytesCompressor getCompressor(CompressionCodecName codecName) {
+Thread me = Thread.currentThread();

Review Comment:
   A thread object is created once and exist until the thread dies. Thread does 
not override hashcode, so it falls back to the implementation in Object, which 
returns a fixed object ID.
   
   I did consider using ThreadLocal, but then it would not be possible for 
release() to clean up all of the (de)compressors from defunct threads.
   
   The way I did it appears to be the recommended solution, since that's what I 
find when I google this problem.





> Thread safety bug in CodecFactory
> -
>
> Key: PARQUET-2126
> URL: https://issues.apache.org/jira/browse/PARQUET-2126
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.12.2
>Reporter: James Turton
>Priority: Major
>
> The code for returning Compressor objects to the caller goes to some lengths 
> to achieve thread safety, including keeping Codec objects in an Apache 
> Commons pool that has thread-safe borrow semantics.  This is all undone by 
> the BytesCompressor and BytesDecompressor Maps in 
> org.apache.parquet.hadoop.CodecFactory which end up caching single compressor 
> and decompressor instances due to code in CodecFactory@getCompressor and 
> CodecFactory@getDecompressor.  When the caller runs multiple threads, those 
> threads end up sharing compressor and decompressor instances.
> For compressors based on Xerial Snappy this bug has no effect because that 
> library is itself thread safe.  But when BuiltInGzipCompressor from Hadoop is 
> selected for the CompressionCodecName.GZIP case, serious problems ensue.  
> That class is not thread safe and sharing one instance of it between threads 
> produces both silent data corruption and JVM crashes.
> To fix this situation, parquet-mr should stop caching single compressor and 
> decompressor instances.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] theosib-amazon commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

2022-05-16 Thread GitBox


theosib-amazon commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r873884939


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##
@@ -184,8 +192,18 @@ public CompressionCodecName getCodecName() {
 
   }
 
+  /*
+  Modified for https://issues.apache.org/jira/browse/PARQUET-2126
+   */
   @Override
   public BytesCompressor getCompressor(CompressionCodecName codecName) {
+Thread me = Thread.currentThread();

Review Comment:
   A thread object is created once and exist until the thread dies. Thread does 
not override hashcode, so it falls back to the implementation in Object, which 
returns a fixed object ID.
   
   I did consider using ThreadLocal, but then it would not be possible for 
release() to clean up all of the (de)compressors from defunct threads.
   
   The way I did it appears to be the recommended solution, since that's what I 
find when I google this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] theosib-amazon commented on pull request #960: Performance optimization: Move all LittleEndianDataInputStream functionality into ByteBufferInputStream

2022-05-16 Thread GitBox


theosib-amazon commented on PR #960:
URL: https://github.com/apache/parquet-mr/pull/960#issuecomment-1127827189

   That improvement comes from a larget set of changes. I have a design doc 
that goes over all those changes plus some more that make it possible to get 
even more performance improvements.
   
   
https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2069) Parquet file containing arrays, written by Parquet-MR, cannot be read again by Parquet-MR

2022-05-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537607#comment-17537607
 ] 

ASF GitHub Bot commented on PARQUET-2069:
-

theosib-amazon commented on PR #957:
URL: https://github.com/apache/parquet-mr/pull/957#issuecomment-1127822921

   OK, check out the code changes. I've redone this completely. Now what it 
does is try out the avro schema, and if that fails, it caches the exception and 
tries again with an avro schema that it converts from the parquet schema. This 
fixes not only 2069 but at least one other bug (whose number I can't remember).




> Parquet file containing arrays, written by Parquet-MR, cannot be read again 
> by Parquet-MR
> -
>
> Key: PARQUET-2069
> URL: https://issues.apache.org/jira/browse/PARQUET-2069
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-avro
>Affects Versions: 1.12.0
> Environment: Windows 10
>Reporter: Devon Kozenieski
>Priority: Blocker
> Attachments: modified.parquet, original.parquet, parquet-diff.png
>
>
> In the attached files, there is one original file, and one written modified 
> file that results after reading the original file and writing it back with 
> Parquet-MR, with a few values modified. The schema should not be modified, 
> since the schema of the input file is used as the schema to write the output 
> file. However, the output file has a slightly modified schema that then 
> cannot be read back the same way again with Parquet-MR, resulting in the 
> exception message:  java.lang.ClassCastException: optional binary element 
> (STRING) is not a group
> My guess is that the issue lies in the Avro schema conversion.
> The Parquet files attached have some arrays and some nested fields.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] theosib-amazon commented on pull request #957: PARQUET-2069: Allow list and array record types to be compatible.

2022-05-16 Thread GitBox


theosib-amazon commented on PR #957:
URL: https://github.com/apache/parquet-mr/pull/957#issuecomment-1127822921

   OK, check out the code changes. I've redone this completely. Now what it 
does is try out the avro schema, and if that fails, it caches the exception and 
tries again with an avro schema that it converts from the parquet schema. This 
fixes not only 2069 but at least one other bug (whose number I can't remember).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2069) Parquet file containing arrays, written by Parquet-MR, cannot be read again by Parquet-MR

2022-05-16 Thread Timothy Miller (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537604#comment-17537604
 ] 

Timothy Miller commented on PARQUET-2069:
-

Well, I tried modifying prepareForRead to just reconstruct the avro schema 
always from the parquet schema, but that caused another test to fail, which is 
org.apache.parquet.avro.TestGenericLogicalTypes. So in the end what I decided 
to do was try using the avro schema, but if that throws an exception, it falls 
back to conversion and tried again.

> Parquet file containing arrays, written by Parquet-MR, cannot be read again 
> by Parquet-MR
> -
>
> Key: PARQUET-2069
> URL: https://issues.apache.org/jira/browse/PARQUET-2069
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-avro
>Affects Versions: 1.12.0
> Environment: Windows 10
>Reporter: Devon Kozenieski
>Priority: Blocker
> Attachments: modified.parquet, original.parquet, parquet-diff.png
>
>
> In the attached files, there is one original file, and one written modified 
> file that results after reading the original file and writing it back with 
> Parquet-MR, with a few values modified. The schema should not be modified, 
> since the schema of the input file is used as the schema to write the output 
> file. However, the output file has a slightly modified schema that then 
> cannot be read back the same way again with Parquet-MR, resulting in the 
> exception message:  java.lang.ClassCastException: optional binary element 
> (STRING) is not a group
> My guess is that the issue lies in the Avro schema conversion.
> The Parquet files attached have some arrays and some nested fields.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: Forward & Backwards Compatibility

2022-05-16 Thread Antoine Pitrou
On Thu, 12 May 2022 09:46:57 -0700
William Butler 
wrote:
> 
> From the JIRA, the converted type looks something like
> 
>   required group FeatureAmounts (MAP) {
> repeated group map (MAP_KEY_VALUE) {
>   required binary key (STRING);
>   required binary key (STRING);
> }
>   }
> 
> 
> but the logical type looks like
> 
>   required group FeatureAmounts (MAP) {
> repeated group map (UNKNOWN) {
>   required binary key (STRING);
>   required binary key (STRING);
> }
>   }
> 
> Parquet C++ does not like that the UNKNOWN/NullLogicalType is being used in
> the groups and rejects the schema with an exception.

Well, why is UNKNOWN used here? This seems like a bug in the producer:
if MAP_KEY_VALUE does not have an equivalent logical type, then no
logical type annotation should be produced, instead of the "UNKNOWN"
logical type annotation which means that all values are null and the
"real" type of the data is therefore lost.

(I understand that this is probably due to confusion from the misnaming
of the "UNKNOWN" logical type, which would have been more appropriately
named "ALWAYS_NULL" or similar)

> The second example involves an INT64 column with a TIMESTAMP_MILLIS
> converted type but a String logical type. Parquet-mr in this example
> fallbacks to the timestamp converted type whereas Parquet C++ throws an
> exception.

Well, I don't know why a String logical type should be accepted for
integer columns with a timestamp converted type.  The fact that
parquet-mr accepts it sounds like a bug in parquet-mr, IMO.

Regards

Antoine.




[jira] [Created] (PARQUET-2148) Enable uniform decryption with plaintext footer

2022-05-16 Thread Gidon Gershinsky (Jira)
Gidon Gershinsky created PARQUET-2148:
-

 Summary: Enable uniform decryption with plaintext footer
 Key: PARQUET-2148
 URL: https://issues.apache.org/jira/browse/PARQUET-2148
 Project: Parquet
  Issue Type: Bug
  Components: parquet-mr
Reporter: Gidon Gershinsky
Assignee: Gidon Gershinsky
 Fix For: 1.12.3


Currently, uniform decryption is not enabled in the plaintext footer mode - for 
no good reason. Column metadata is available, we just need to decrypt and use 
it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)