[jira] [Updated] (PARQUET-2160) Close decompression stream to free off-heap memory in time
[ https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yujiang Zhong updated PARQUET-2160: --- Description: The decompressed stream in HeapBytesDecompressor$decompress now relies on the JVM GC to close. When reading parquet in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. I think the reason is that the GC is not timely and causes off-heap memory fragmentation. I had to set lower MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There is a [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] of this zstd parquet issus in Iceberg community slack: some people had the same problem. I think maybe we can use ByteArrayBytesInput as decompressed bytes input and close decompressed stream in time to solve this problem: {code:java} InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.from(is, uncompressedSize); {code} -> {code:java} InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); is.close(); {code} After I made this change to decompress, I found off-heap memory is significantly reduced (with same query on same data). was: The decompressed stream in HeapBytesDecompressor$decompress now relies on the JVM GC to close. When reading parquet in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. I think the reason is that the GC is not timely and causes off-heap memory fragmentation. I had to set lower MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There is a [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] of this zstd parquet issus in Iceberg community slack: some people had the same problem. I think maybe we can use ByteArrayBytesInput as decompressed bytes input and close decompressed stream in time to solve this problem: InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.from(is, uncompressedSize); -> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, uncompressedSize)); is.close(); After I made this change to decompress, I found off-heap memory is significantly reduced (with same query on same data). > Close decompression stream to free off-heap memory in time > -- > > Key: PARQUET-2160 > URL: https://issues.apache.org/jira/browse/PARQUET-2160 > Project: Parquet > Issue Type: Improvement > Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni > 1.4.9.1 + glibc >Reporter: Yujiang Zhong >Priority: Major > > The decompressed stream in HeapBytesDecompressor$decompress now relies on the > JVM GC to close. When reading parquet in zstd compressed format, sometimes I > ran into OOM cause high off-heap usage. I think the reason is that the GC is > not timely and causes off-heap memory fragmentation. I had to set lower > MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. > There is a > [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] > of this zstd parquet issus in Iceberg community slack: some people had the > same problem. > I think maybe we can use ByteArrayBytesInput as decompressed bytes input and > close decompressed stream in time to solve this problem: > {code:java} > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.from(is, uncompressedSize); {code} > -> > {code:java} > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); > is.close(); {code} > After I made this change to decompress, I found off-heap memory is > significantly reduced (with same query on same data). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (PARQUET-2160) Close decompression stream to free off-heap memory in time
[ https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yujiang Zhong updated PARQUET-2160: --- Description: The decompressed stream in HeapBytesDecompressor$decompress now relies on the JVM GC to close. When reading parquet in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. I think the reason is that the GC is not timely and causes off-heap memory fragmentation. I had to set lower MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There is a [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] of this zstd parquet issus in Iceberg community slack: some people had the same problem. I think maybe we can use ByteArrayBytesInput as decompressed bytes input and close decompressed stream in time to solve this problem: InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.from(is, uncompressedSize); -> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, uncompressedSize)); is.close(); After I made this change to decompress, I found off-heap memory is significantly reduced (with same query on same data). was: The decompressed stream in HeapBytesDecompressor$decompress now relies on the JVM GC to close. When reading parquet in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. I think the reason is that the GC is not timely and causes off-heap memory fragmentation. I had to set lower MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There is a [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] of this zstd parquet issus in Iceberg community slack: some people had the same problem. I think we can close decompressed stream mannually in time to solve this problem: InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.from(is, uncompressedSize); -> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, uncompressedSize)); is.close(); After I made this change to decompress, I found off-heap memory is significantly reduced (with same query on same data). > Close decompression stream to free off-heap memory in time > -- > > Key: PARQUET-2160 > URL: https://issues.apache.org/jira/browse/PARQUET-2160 > Project: Parquet > Issue Type: Improvement > Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni > 1.4.9.1 + glibc >Reporter: Yujiang Zhong >Priority: Major > > The decompressed stream in HeapBytesDecompressor$decompress now relies on the > JVM GC to close. When reading parquet in zstd compressed format, sometimes I > ran into OOM cause high off-heap usage. I think the reason is that the GC is > not timely and causes off-heap memory fragmentation. I had to set lower > MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. > There is a > [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] > of this zstd parquet issus in Iceberg community slack: some people had the > same problem. > I think maybe we can use ByteArrayBytesInput as decompressed bytes input and > close decompressed stream in time to solve this problem: > > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.from(is, uncompressedSize); > -> > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, > uncompressedSize)); > is.close(); > > After I made this change to decompress, I found off-heap memory is > significantly reduced (with same query on same data). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2159) Parquet bit-packing de/encode optimization
[ https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555142#comment-17555142 ] Timothy Miller commented on PARQUET-2159: - If this is already being generated at runtime, then it sounds to me like you might be able to generate different things depending on the compiler's source version setting. > Parquet bit-packing de/encode optimization > -- > > Key: PARQUET-2159 > URL: https://issues.apache.org/jira/browse/PARQUET-2159 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Affects Versions: 1.13.0 >Reporter: Fang-Xie >Priority: Major > Fix For: 1.13.0 > > Attachments: image-2022-06-15-22-56-08-396.png, > image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, > image-2022-06-15-22-58-40-704.png > > > Current Spark use Parquet-mr as parquet reader/writer library, but the > built-in bit-packing en/decode is not efficient enough. > Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector > in Open JDK18 brings prominent performance improvement. > Due to Vector API is added to OpenJDK since 16, So this optimization request > JDK16 or higher. > *Below are our test results* > Functional test is based on open-source parquet-mr Bit-pack decoding > function: *_public final void unpack8Values(final byte[] in, final int inPos, > final int[] out, final int outPos)_* __ > compared with our implementation with vector API *_public final void > unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final > int outPos)_* > We tested 10 pairs (open source parquet bit unpacking vs ours optimized > vectorized SIMD implementation) decode function with bit > width=\{1,2,3,4,5,6,7,8,9,10}, below are test results: > !image-2022-06-15-22-56-08-396.png|width=437,height=223! > We integrated our bit-packing decode implementation into parquet-mr, tested > the parquet batch reader ability from Spark VectorizedParquetRecordReader > which get parquet column data by the batch way. We construct parquet file > with different row count and column count, the column data type is Int32, the > maximum int value is 127 which satisfies bit pack encode with bit width=7, > the count of the row is from 10k to 100 million and the count of the column > is from 1 to 4. > !image-2022-06-15-22-57-15-964.png|width=453,height=229! > !image-2022-06-15-22-58-01-442.png|width=439,height=217! > !image-2022-06-15-22-58-40-704.png|width=415,height=208! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (PARQUET-2159) Parquet bit-packing de/encode optimization
[ https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555116#comment-17555116 ] Fang-Xie edited comment on PARQUET-2159 at 6/16/22 3:14 PM: We implemented Parquet bit packing en/decode using JDK Vector API which can get benefits from SIMD instruction, refer to [https://openjdk.org/jeps/417] The original bit-packing code is generated from [https://github.com/apache/parquet-mr/blob/master/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java] via maven build. We reimplemented bit-pack functions but it had to compile and run under JDK 16 or higher (needs Vector APIs), So not sure we can contribute code to Parquet community. was (Author: JIRAUSER288151): We implemented Parquet bit packing en/decode using JDK Vector API which can get benefits from SIMD instruction, refer to [https://openjdk.org/jeps/417] The original bit-packing code is generated from [https://github.com/apache/parquet-mr/blob/master/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java] during maven build. We reimplemented bit-pack functions but it had to compile and run under JDK 16 or higher (needs Vector APIs), So not sure we can contribute code to Parquet community. > Parquet bit-packing de/encode optimization > -- > > Key: PARQUET-2159 > URL: https://issues.apache.org/jira/browse/PARQUET-2159 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Affects Versions: 1.13.0 >Reporter: Fang-Xie >Priority: Major > Fix For: 1.13.0 > > Attachments: image-2022-06-15-22-56-08-396.png, > image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, > image-2022-06-15-22-58-40-704.png > > > Current Spark use Parquet-mr as parquet reader/writer library, but the > built-in bit-packing en/decode is not efficient enough. > Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector > in Open JDK18 brings prominent performance improvement. > Due to Vector API is added to OpenJDK since 16, So this optimization request > JDK16 or higher. > *Below are our test results* > Functional test is based on open-source parquet-mr Bit-pack decoding > function: *_public final void unpack8Values(final byte[] in, final int inPos, > final int[] out, final int outPos)_* __ > compared with our implementation with vector API *_public final void > unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final > int outPos)_* > We tested 10 pairs (open source parquet bit unpacking vs ours optimized > vectorized SIMD implementation) decode function with bit > width=\{1,2,3,4,5,6,7,8,9,10}, below are test results: > !image-2022-06-15-22-56-08-396.png|width=437,height=223! > We integrated our bit-packing decode implementation into parquet-mr, tested > the parquet batch reader ability from Spark VectorizedParquetRecordReader > which get parquet column data by the batch way. We construct parquet file > with different row count and column count, the column data type is Int32, the > maximum int value is 127 which satisfies bit pack encode with bit width=7, > the count of the row is from 10k to 100 million and the count of the column > is from 1 to 4. > !image-2022-06-15-22-57-15-964.png|width=453,height=229! > !image-2022-06-15-22-58-01-442.png|width=439,height=217! > !image-2022-06-15-22-58-40-704.png|width=415,height=208! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] huaxingao commented on pull request #975: PARQUET-2157: add bloom filter fpp config
huaxingao commented on PR #975: URL: https://github.com/apache/parquet-mr/pull/975#issuecomment-1157762209 > it should be good enough to also check the lower limit, eg exist > totalCount * (testFpp[i] * 0.9) , or exist > totalCount * (testFpp[i] * 0.5) , or even exist > 0. What do you think? This way, we'll be certain the test passes not because exist is just 0. Thanks for the suggestion! I can't find a reliable number for the lower limit. I put `exist > 0`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2157) Add BloomFilter fpp config
[ https://issues.apache.org/jira/browse/PARQUET-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555118#comment-17555118 ] ASF GitHub Bot commented on PARQUET-2157: - huaxingao commented on PR #975: URL: https://github.com/apache/parquet-mr/pull/975#issuecomment-1157762209 > it should be good enough to also check the lower limit, eg exist > totalCount * (testFpp[i] * 0.9) , or exist > totalCount * (testFpp[i] * 0.5) , or even exist > 0. What do you think? This way, we'll be certain the test passes not because exist is just 0. Thanks for the suggestion! I can't find a reliable number for the lower limit. I put `exist > 0`. > Add BloomFilter fpp config > -- > > Key: PARQUET-2157 > URL: https://issues.apache.org/jira/browse/PARQUET-2157 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Huaxin Gao >Priority: Major > > Currently parquet-mr hardcoded bloom filter fpp (false positive probability) > to 0.01. We should have a config to let user to specify fpp. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2159) Parquet bit-packing de/encode optimization
[ https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555116#comment-17555116 ] Fang-Xie commented on PARQUET-2159: --- We implemented Parquet bit packing en/decode using JDK Vector API which can get benefits from SIMD instruction, refer to [https://openjdk.org/jeps/417] The original bit-packing code is generated from [https://github.com/apache/parquet-mr/blob/master/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java] during maven build. We reimplemented bit-pack functions but it had to compile and run under JDK 16 or higher (needs Vector APIs), So not sure we can contribute code to Parquet community. > Parquet bit-packing de/encode optimization > -- > > Key: PARQUET-2159 > URL: https://issues.apache.org/jira/browse/PARQUET-2159 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Affects Versions: 1.13.0 >Reporter: Fang-Xie >Priority: Major > Fix For: 1.13.0 > > Attachments: image-2022-06-15-22-56-08-396.png, > image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, > image-2022-06-15-22-58-40-704.png > > > Current Spark use Parquet-mr as parquet reader/writer library, but the > built-in bit-packing en/decode is not efficient enough. > Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector > in Open JDK18 brings prominent performance improvement. > Due to Vector API is added to OpenJDK since 16, So this optimization request > JDK16 or higher. > *Below are our test results* > Functional test is based on open-source parquet-mr Bit-pack decoding > function: *_public final void unpack8Values(final byte[] in, final int inPos, > final int[] out, final int outPos)_* __ > compared with our implementation with vector API *_public final void > unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final > int outPos)_* > We tested 10 pairs (open source parquet bit unpacking vs ours optimized > vectorized SIMD implementation) decode function with bit > width=\{1,2,3,4,5,6,7,8,9,10}, below are test results: > !image-2022-06-15-22-56-08-396.png|width=437,height=223! > We integrated our bit-packing decode implementation into parquet-mr, tested > the parquet batch reader ability from Spark VectorizedParquetRecordReader > which get parquet column data by the batch way. We construct parquet file > with different row count and column count, the column data type is Int32, the > maximum int value is 127 which satisfies bit pack encode with bit width=7, > the count of the row is from 10k to 100 million and the count of the column > is from 1 to 4. > !image-2022-06-15-22-57-15-964.png|width=453,height=229! > !image-2022-06-15-22-58-01-442.png|width=439,height=217! > !image-2022-06-15-22-58-40-704.png|width=415,height=208! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2157) Add BloomFilter fpp config
[ https://issues.apache.org/jira/browse/PARQUET-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555115#comment-17555115 ] ASF GitHub Bot commented on PARQUET-2157: - huaxingao commented on code in PR #975: URL: https://github.com/apache/parquet-mr/pull/975#discussion_r899177750 ## parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java: ## @@ -282,6 +286,63 @@ public void testParquetFileWithBloomFilter() throws IOException { } } + @Test + public void testParquetFileWithBloomFilterWithFpp() throws IOException { +int totalCount = 10; +double[] testFpp = {0.005, 0.01, 0.05, 0.10, 0.15, 0.20, 0.25}; + +Set distinctStrings = new HashSet<>(); +while (distinctStrings.size() < totalCount) { + String str = RandomStringUtils.randomAlphabetic(12); + distinctStrings.add(str); +} + +MessageType schema = Types.buildMessage(). + required(BINARY).as(stringType()).named("name").named("msg"); + +Configuration conf = new Configuration(); +GroupWriteSupport.setSchema(schema, conf); + +GroupFactory factory = new SimpleGroupFactory(schema); +for (int i = 0; i < testFpp.length; i++) { + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) +.withPageRowCountLimit(10) +.withConf(conf) +.withDictionaryEncoding(false) +.withBloomFilterEnabled("name", true) +.withBloomFilterNDV("name", totalCount) +.withBloomFilterFPP("name", testFpp[i]) +.build()) { +java.util.Iterator iterator = distinctStrings.iterator(); +while (iterator.hasNext()) { + writer.write(factory.newGroup().append("name", iterator.next())); +} + } + distinctStrings.clear(); + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration( { +BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); +BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData) + .readBloomFilter(blockMetaData.getColumns().get(0)); + +// The exist counts the number of times FindHash returns true. +int exist = 0; +while (distinctStrings.size() < totalCount) { + String str = RandomStringUtils.randomAlphabetic(10); + if (distinctStrings.add(str) && + bloomFilter.findHash(LongHashFunction.xx(0).hashBytes(Binary.fromString(str).toByteBuffer( { +exist++; + } +} +// The exist should be less than totalCount * fpp. Add 10% here for error space. +assertTrue(exist < totalCount * (testFpp[i] * 1.1)); Review Comment: Yes. Agree. > Add BloomFilter fpp config > -- > > Key: PARQUET-2157 > URL: https://issues.apache.org/jira/browse/PARQUET-2157 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Huaxin Gao >Priority: Major > > Currently parquet-mr hardcoded bloom filter fpp (false positive probability) > to 0.01. We should have a config to let user to specify fpp. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] huaxingao commented on a diff in pull request #975: PARQUET-2157: add bloom filter fpp config
huaxingao commented on code in PR #975: URL: https://github.com/apache/parquet-mr/pull/975#discussion_r899177750 ## parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java: ## @@ -282,6 +286,63 @@ public void testParquetFileWithBloomFilter() throws IOException { } } + @Test + public void testParquetFileWithBloomFilterWithFpp() throws IOException { +int totalCount = 10; +double[] testFpp = {0.005, 0.01, 0.05, 0.10, 0.15, 0.20, 0.25}; + +Set distinctStrings = new HashSet<>(); +while (distinctStrings.size() < totalCount) { + String str = RandomStringUtils.randomAlphabetic(12); + distinctStrings.add(str); +} + +MessageType schema = Types.buildMessage(). + required(BINARY).as(stringType()).named("name").named("msg"); + +Configuration conf = new Configuration(); +GroupWriteSupport.setSchema(schema, conf); + +GroupFactory factory = new SimpleGroupFactory(schema); +for (int i = 0; i < testFpp.length; i++) { + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) +.withPageRowCountLimit(10) +.withConf(conf) +.withDictionaryEncoding(false) +.withBloomFilterEnabled("name", true) +.withBloomFilterNDV("name", totalCount) +.withBloomFilterFPP("name", testFpp[i]) +.build()) { +java.util.Iterator iterator = distinctStrings.iterator(); +while (iterator.hasNext()) { + writer.write(factory.newGroup().append("name", iterator.next())); +} + } + distinctStrings.clear(); + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration( { +BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); +BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData) + .readBloomFilter(blockMetaData.getColumns().get(0)); + +// The exist counts the number of times FindHash returns true. +int exist = 0; +while (distinctStrings.size() < totalCount) { + String str = RandomStringUtils.randomAlphabetic(10); + if (distinctStrings.add(str) && + bloomFilter.findHash(LongHashFunction.xx(0).hashBytes(Binary.fromString(str).toByteBuffer( { +exist++; + } +} +// The exist should be less than totalCount * fpp. Add 10% here for error space. +assertTrue(exist < totalCount * (testFpp[i] * 1.1)); Review Comment: Yes. Agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (PARQUET-2159) Parquet bit-packing de/encode optimization
[ https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555099#comment-17555099 ] Timothy Miller commented on PARQUET-2159: - I frequently wish Java had a preprocessor like C++ that would solve your problem. Currently, we have to build ParquetMR with Java 8, and plenty of things that depend on it (like Trino and Presto) use Java 11 at the latest. There are some solutions involving runtime loading of class files (e.g. [https://stackoverflow.com/questions/4526113/java-conditional-compilation-how-to-prevent-code-chunks-from-being-compiled),] but there's already enough weirdness in the ParquetMR build process (e.g. compile-time generated code that makes debugging a huge pain) that I hesitate to suggest making it even more challenging. Where actually is the code coming from that you're working on? Is it part of ParquetMR or some other library? I seem to recall that when debugging Trino, the code you're working on has to go through the decompiler in IntelliJ. So if it's already external in some way, then it probably wouldn't hurt to make it just a bit more dynamic, where ParquetMR loads different versions of the bit-packing library depending on the Java version. > Parquet bit-packing de/encode optimization > -- > > Key: PARQUET-2159 > URL: https://issues.apache.org/jira/browse/PARQUET-2159 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Affects Versions: 1.13.0 >Reporter: Fang-Xie >Priority: Major > Fix For: 1.13.0 > > Attachments: image-2022-06-15-22-56-08-396.png, > image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, > image-2022-06-15-22-58-40-704.png > > > Current Spark use Parquet-mr as parquet reader/writer library, but the > built-in bit-packing en/decode is not efficient enough. > Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector > in Open JDK18 brings prominent performance improvement. > Due to Vector API is added to OpenJDK since 16, So this optimization request > JDK16 or higher. > *Below are our test results* > Functional test is based on open-source parquet-mr Bit-pack decoding > function: *_public final void unpack8Values(final byte[] in, final int inPos, > final int[] out, final int outPos)_* __ > compared with our implementation with vector API *_public final void > unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final > int outPos)_* > We tested 10 pairs (open source parquet bit unpacking vs ours optimized > vectorized SIMD implementation) decode function with bit > width=\{1,2,3,4,5,6,7,8,9,10}, below are test results: > !image-2022-06-15-22-56-08-396.png|width=437,height=223! > We integrated our bit-packing decode implementation into parquet-mr, tested > the parquet batch reader ability from Spark VectorizedParquetRecordReader > which get parquet column data by the batch way. We construct parquet file > with different row count and column count, the column data type is Int32, the > maximum int value is 127 which satisfies bit pack encode with bit width=7, > the count of the row is from 10k to 100 million and the count of the column > is from 1 to 4. > !image-2022-06-15-22-57-15-964.png|width=453,height=229! > !image-2022-06-15-22-58-01-442.png|width=439,height=217! > !image-2022-06-15-22-58-40-704.png|width=415,height=208! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (PARQUET-2160) Close decompression stream to free off-heap memory in time
[ https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yujiang Zhong updated PARQUET-2160: --- Description: The decompressed stream in HeapBytesDecompressor$decompress now relies on the JVM GC to close. When reading parquet in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. I think the reason is that the GC is not timely and causes off-heap memory fragmentation. I had to set lower MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There is a [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] of this zstd parquet issus in Iceberg community slack: some people had the same problem. I think we can close decompressed stream mannually in time to solve this problem: InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.from(is, uncompressedSize); -> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, uncompressedSize)); is.close(); After I made this change to decompress, I found off-heap memory is significantly reduced (with same query on same data). was: The decompressed stream now relies on the JVM GC to close. When reading parquet in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. I think the reason is that the GC is not timely and causes off-heap memory fragmentation. I had to set lower MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There is a [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] of this zstd parquet issus in Iceberg community slack: some people had the same problem. I think we can close decompressed stream mannually in time to solve this problem: InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.from(is, uncompressedSize); -> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, uncompressedSize)); is.close(); After I made this change to decompress, I found off-heap memory is significantly reduced (with same query on same data). > Close decompression stream to free off-heap memory in time > -- > > Key: PARQUET-2160 > URL: https://issues.apache.org/jira/browse/PARQUET-2160 > Project: Parquet > Issue Type: Improvement > Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni > 1.4.9.1 + glibc >Reporter: Yujiang Zhong >Priority: Major > > The decompressed stream in HeapBytesDecompressor$decompress now relies on the > JVM GC to close. When reading parquet in zstd compressed format, sometimes I > ran into OOM cause high off-heap usage. I think the reason is that the GC is > not timely and causes off-heap memory fragmentation. I had to set lower > MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. > There is a > [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] > of this zstd parquet issus in Iceberg community slack: some people had the > same problem. > I think we can close decompressed stream mannually in time to solve this > problem: > > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.from(is, uncompressedSize); > -> > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, > uncompressedSize)); > is.close(); > > After I made this change to decompress, I found off-heap memory is > significantly reduced (with same query on same data). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2160) Close decompression stream to free off-heap memory in time
[ https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555082#comment-17555082 ] Yujiang Zhong commented on PARQUET-2160: [~shangxinli] [~dongjoon] Can you please take a look at this? > Close decompression stream to free off-heap memory in time > -- > > Key: PARQUET-2160 > URL: https://issues.apache.org/jira/browse/PARQUET-2160 > Project: Parquet > Issue Type: Improvement > Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni > 1.4.9.1 + glibc >Reporter: Yujiang Zhong >Priority: Major > > The decompressed stream now relies on the JVM GC to close. When reading > parquet in zstd compressed format, sometimes I ran into OOM cause high > off-heap usage. I think the reason is that the GC is not timely and causes > off-heap memory fragmentation. I had to set lower MALLOC_TRIM_THRESHOLD_ to > make glibc give back memory to system quickly. There is a > [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] > of this zstd parquet issus in Iceberg community slack: some people had the > same problem. > I think we can close decompressed stream mannually in time to solve this > problem: > > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.from(is, uncompressedSize); > -> > InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); > decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, > uncompressedSize)); > is.close(); > > After I made this change to decompress, I found off-heap memory is > significantly reduced (with same query on same data). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (PARQUET-2160) Close decompression stream to free off-heap memory in time
Yujiang Zhong created PARQUET-2160: -- Summary: Close decompression stream to free off-heap memory in time Key: PARQUET-2160 URL: https://issues.apache.org/jira/browse/PARQUET-2160 Project: Parquet Issue Type: Improvement Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 1.4.9.1 + glibc Reporter: Yujiang Zhong The decompressed stream now relies on the JVM GC to close. When reading parquet in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. I think the reason is that the GC is not timely and causes off-heap memory fragmentation. I had to set lower MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There is a [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]] of this zstd parquet issus in Iceberg community slack: some people had the same problem. I think we can close decompressed stream mannually in time to solve this problem: InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.from(is, uncompressedSize); -> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, uncompressedSize)); is.close(); After I made this change to decompress, I found off-heap memory is significantly reduced (with same query on same data). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2159) Parquet bit-packing de/encode optimization
[ https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555081#comment-17555081 ] Fang-Xie commented on PARQUET-2159: --- Thanks [~theosib-amazon], these improvements depend on Vector API in JDK 16 or higher version, how do we consider about JDK version issue? > Parquet bit-packing de/encode optimization > -- > > Key: PARQUET-2159 > URL: https://issues.apache.org/jira/browse/PARQUET-2159 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Affects Versions: 1.13.0 >Reporter: Fang-Xie >Priority: Major > Fix For: 1.13.0 > > Attachments: image-2022-06-15-22-56-08-396.png, > image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, > image-2022-06-15-22-58-40-704.png > > > Current Spark use Parquet-mr as parquet reader/writer library, but the > built-in bit-packing en/decode is not efficient enough. > Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector > in Open JDK18 brings prominent performance improvement. > Due to Vector API is added to OpenJDK since 16, So this optimization request > JDK16 or higher. > *Below are our test results* > Functional test is based on open-source parquet-mr Bit-pack decoding > function: *_public final void unpack8Values(final byte[] in, final int inPos, > final int[] out, final int outPos)_* __ > compared with our implementation with vector API *_public final void > unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final > int outPos)_* > We tested 10 pairs (open source parquet bit unpacking vs ours optimized > vectorized SIMD implementation) decode function with bit > width=\{1,2,3,4,5,6,7,8,9,10}, below are test results: > !image-2022-06-15-22-56-08-396.png|width=437,height=223! > We integrated our bit-packing decode implementation into parquet-mr, tested > the parquet batch reader ability from Spark VectorizedParquetRecordReader > which get parquet column data by the batch way. We construct parquet file > with different row count and column count, the column data type is Int32, the > maximum int value is 127 which satisfies bit pack encode with bit width=7, > the count of the row is from 10k to 100 million and the count of the column > is from 1 to 4. > !image-2022-06-15-22-57-15-964.png|width=453,height=229! > !image-2022-06-15-22-58-01-442.png|width=439,height=217! > !image-2022-06-15-22-58-40-704.png|width=415,height=208! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (PARQUET-2051) AvroWriteSupport does not pass Configuration to AvroSchemaConverter on Creation
[ https://issues.apache.org/jira/browse/PARQUET-2051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andreas Hailu updated PARQUET-2051: --- Fix Version/s: 1.12.3 > AvroWriteSupport does not pass Configuration to AvroSchemaConverter on > Creation > --- > > Key: PARQUET-2051 > URL: https://issues.apache.org/jira/browse/PARQUET-2051 > Project: Parquet > Issue Type: Bug >Reporter: Andreas Hailu >Assignee: Andreas Hailu >Priority: Major > Fix For: 1.12.3 > > > Because of this, we're unable to fully leverage the ThreeLevelListWriter > functionality when trying to write Avro lists out using Parquet through the > AvroParquetOutputFormat. > The following record is used for testing: > Schema: > { "type": "record", "name": "NullLists", "namespace": "com.test", "fields": [ > \{ "name": "KeyID", "type": "string" }, \{ "name": "NullableList", "type": [ > "null", { "type": "array", "items": [ "null", "string" ] } ], "default": null > } ] } > Record (using basic JSON just for display purposes): > { "KeyID": "0", "NullableList": [ "foo", null, "baz" ] } > During testing, we see the following exception: > {quote}{{Caused by: java.lang.ClassCastException: repeated binary array > (STRING) is not a group}} > \{{ at org.apache.parquet.schema.Type.asGroupType(Type.java:250)}} > \{{ at > org.apache.parquet.avro.AvroWriteSupport$ThreeLevelListWriter.writeCollection(AvroWriteSupport.java:612)}} > \{{ at > org.apache.parquet.avro.AvroWriteSupport$ListWriter.writeList(AvroWriteSupport.java:397)}} > \{{ at > org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:355)}} > \{{ at > org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)}} > \{{ at > org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)}} > \{{ at > org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)}} > \{{ at > org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128}} > {quote} > Upon review, it was found that the configuration option that was set in > AvroWriteSupport for the ThreeLevelListWriter, > parquet.avro.write-old-list-structure being set to false, was never shared > with the AvroSchemaConverter. > Once we made this change and tested locally, we observe the record with nulls > in the array being successfully written by AvroParquetOutputFormat. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (PARQUET-2157) Add BloomFilter fpp config
[ https://issues.apache.org/jira/browse/PARQUET-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555041#comment-17555041 ] ASF GitHub Bot commented on PARQUET-2157: - ggershinsky commented on PR #975: URL: https://github.com/apache/parquet-mr/pull/975#issuecomment-1157577513 > The test takes about 2300 milli seconds on my laptop. Ok, this is reasonable. If this time is sufficient for reliably testing the upper limit of FPPs, it should be good enough to also check the lower limit, eg `exist > totalCount * (testFpp[i] * 0.9)` , or `exist > totalCount * (testFpp[i] * 0.5)` , or even `exist > 0`. What do you think? This way, we'll be certain the test passes not because `exist` is just 0. > Add BloomFilter fpp config > -- > > Key: PARQUET-2157 > URL: https://issues.apache.org/jira/browse/PARQUET-2157 > Project: Parquet > Issue Type: Improvement > Components: parquet-mr >Reporter: Huaxin Gao >Priority: Major > > Currently parquet-mr hardcoded bloom filter fpp (false positive probability) > to 0.01. We should have a config to let user to specify fpp. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [parquet-mr] ggershinsky commented on pull request #975: PARQUET-2157: add bloom filter fpp config
ggershinsky commented on PR #975: URL: https://github.com/apache/parquet-mr/pull/975#issuecomment-1157577513 > The test takes about 2300 milli seconds on my laptop. Ok, this is reasonable. If this time is sufficient for reliably testing the upper limit of FPPs, it should be good enough to also check the lower limit, eg `exist > totalCount * (testFpp[i] * 0.9)` , or `exist > totalCount * (testFpp[i] * 0.5)` , or even `exist > 0`. What do you think? This way, we'll be certain the test passes not because `exist` is just 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org