wgtmac commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = 
Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new 
HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = 
new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> 
allCompressors = new ConcurrentHashMap<>();

Review Comment:
   CMIIW, the parquet writer will always create a new CodecFactory internally, 
so getCompressor does not suffer from any thread-safety issue.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = 
Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new 
HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = 
new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> 
allCompressors = new ConcurrentHashMap<>();
+  private final Map<Thread, Map<CompressionCodecName, BytesDecompressor>> 
allDecompressors = new ConcurrentHashMap<>();

Review Comment:
   It looks like we cannot remove the cached entry from the map when thread 
exits. What is worse, the map will get explosion if there are tons of threads 
in a long running instance. The `getDecompressor` gets called only on the 
column chunk basis. So I expect the frequency of the call would not be very 
high. Would a single `ConcurrentHashMap<CompressionCodecName, 
BytesDecompressor>()` be sufficient? If we really care about the regression of 
introducing ConcurrentHashMap when we are sure no concurrency will happen, we 
can add a new thread-unsafe CodecFactory implementation to do the job. Any 
thoughts? @theosib-amazon @shangxinli 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to