[jira] [Updated] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-06-16 Thread Yujiang Zhong (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yujiang Zhong updated PARQUET-2160:
---
Description: 
The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
ran into OOM cause high off-heap usage. I think the reason is that the GC is 
not timely and causes off-heap memory fragmentation. I had to set  lower 
MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There 
is a 
[thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
 of this zstd parquet issus in Iceberg community slack:  some people had the 
same problem. 

I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
close decompressed stream in time to solve this problem:
{code:java}
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
decompressed = BytesInput.from(is, uncompressedSize); {code}
->
{code:java}
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
is.close(); {code}
After I made this change to decompress, I found off-heap memory is 
significantly reduced (with same query on same data).

  was:
The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
ran into OOM cause high off-heap usage. I think the reason is that the GC is 
not timely and causes off-heap memory fragmentation. I had to set  lower 
MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There 
is a 
[thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
 of this zstd parquet issus in Iceberg community slack:  some people had the 
same problem. 

I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
close decompressed stream in time to solve this problem:

 

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.from(is, uncompressedSize);

->

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, 
uncompressedSize));

is.close();

 

After I made this change to decompress, I found off-heap memory is 
significantly reduced (with same query on same data).


> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-06-16 Thread Yujiang Zhong (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yujiang Zhong updated PARQUET-2160:
---
Description: 
The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
ran into OOM cause high off-heap usage. I think the reason is that the GC is 
not timely and causes off-heap memory fragmentation. I had to set  lower 
MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There 
is a 
[thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
 of this zstd parquet issus in Iceberg community slack:  some people had the 
same problem. 

I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
close decompressed stream in time to solve this problem:

 

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.from(is, uncompressedSize);

->

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, 
uncompressedSize));

is.close();

 

After I made this change to decompress, I found off-heap memory is 
significantly reduced (with same query on same data).

  was:
The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
ran into OOM cause high off-heap usage. I think the reason is that the GC is 
not timely and causes off-heap memory fragmentation. I had to set  lower 
MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There 
is a 
[thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
 of this zstd parquet issus in Iceberg community slack:  some people had the 
same problem. 

I think we can close decompressed stream mannually in time to solve this 
problem:

 

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.from(is, uncompressedSize);

->

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, 
uncompressedSize));

is.close();

 

After I made this change to decompress, I found off-heap memory is 
significantly reduced (with same query on same data).


> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
>  
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize);
> ->
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, 
> uncompressedSize));
> is.close();
>  
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2159) Parquet bit-packing de/encode optimization

2022-06-16 Thread Timothy Miller (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555142#comment-17555142
 ] 

Timothy Miller commented on PARQUET-2159:
-

If this is already being generated at runtime, then it sounds to me like you 
might be able to generate different things depending on the compiler's source 
version setting.

> Parquet bit-packing de/encode optimization
> --
>
> Key: PARQUET-2159
> URL: https://issues.apache.org/jira/browse/PARQUET-2159
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Fang-Xie
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: image-2022-06-15-22-56-08-396.png, 
> image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, 
> image-2022-06-15-22-58-40-704.png
>
>
> Current Spark use Parquet-mr as parquet reader/writer library, but the 
> built-in bit-packing en/decode is not efficient enough. 
> Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector 
> in Open JDK18 brings prominent performance improvement.
> Due to Vector API is added to OpenJDK since 16, So this optimization request 
> JDK16 or higher.
> *Below are our test results*
> Functional test is based on open-source parquet-mr Bit-pack decoding 
> function: *_public final void unpack8Values(final byte[] in, final int inPos, 
> final int[] out, final int outPos)_* __
> compared with our implementation with vector API *_public final void 
> unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final 
> int outPos)_*
> We tested 10 pairs (open source parquet bit unpacking vs ours optimized 
> vectorized SIMD implementation) decode function with bit 
> width=\{1,2,3,4,5,6,7,8,9,10}, below are test results:
> !image-2022-06-15-22-56-08-396.png|width=437,height=223!
> We integrated our bit-packing decode implementation into parquet-mr, tested 
> the parquet batch reader ability from Spark VectorizedParquetRecordReader 
> which get parquet column data by the batch way. We construct parquet file 
> with different row count and column count, the column data type is Int32, the 
> maximum int value is 127 which satisfies bit pack encode with bit width=7,   
> the count of the row is from 10k to 100 million and the count of the column 
> is from 1 to 4.
> !image-2022-06-15-22-57-15-964.png|width=453,height=229!
> !image-2022-06-15-22-58-01-442.png|width=439,height=217!
> !image-2022-06-15-22-58-40-704.png|width=415,height=208!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (PARQUET-2159) Parquet bit-packing de/encode optimization

2022-06-16 Thread Fang-Xie (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555116#comment-17555116
 ] 

Fang-Xie edited comment on PARQUET-2159 at 6/16/22 3:14 PM:


We implemented Parquet bit packing en/decode using JDK Vector API which can get 
benefits from SIMD instruction, refer to [https://openjdk.org/jeps/417]

 The original bit-packing code is generated from 
[https://github.com/apache/parquet-mr/blob/master/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java]

via maven build. We reimplemented bit-pack functions

but it had to compile and run under JDK 16 or higher (needs Vector APIs), So 
not sure we can contribute code to Parquet community.

 

 

 


was (Author: JIRAUSER288151):
We implemented Parquet bit packing en/decode using JDK Vector API which can get 
benefits from SIMD instruction, refer to [https://openjdk.org/jeps/417]

 The original bit-packing code is generated from 
[https://github.com/apache/parquet-mr/blob/master/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java]

during maven build. We reimplemented bit-pack functions

but it had to compile and run under JDK 16 or higher (needs Vector APIs), So 
not sure we can contribute code to Parquet community.

 

 

 

> Parquet bit-packing de/encode optimization
> --
>
> Key: PARQUET-2159
> URL: https://issues.apache.org/jira/browse/PARQUET-2159
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Fang-Xie
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: image-2022-06-15-22-56-08-396.png, 
> image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, 
> image-2022-06-15-22-58-40-704.png
>
>
> Current Spark use Parquet-mr as parquet reader/writer library, but the 
> built-in bit-packing en/decode is not efficient enough. 
> Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector 
> in Open JDK18 brings prominent performance improvement.
> Due to Vector API is added to OpenJDK since 16, So this optimization request 
> JDK16 or higher.
> *Below are our test results*
> Functional test is based on open-source parquet-mr Bit-pack decoding 
> function: *_public final void unpack8Values(final byte[] in, final int inPos, 
> final int[] out, final int outPos)_* __
> compared with our implementation with vector API *_public final void 
> unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final 
> int outPos)_*
> We tested 10 pairs (open source parquet bit unpacking vs ours optimized 
> vectorized SIMD implementation) decode function with bit 
> width=\{1,2,3,4,5,6,7,8,9,10}, below are test results:
> !image-2022-06-15-22-56-08-396.png|width=437,height=223!
> We integrated our bit-packing decode implementation into parquet-mr, tested 
> the parquet batch reader ability from Spark VectorizedParquetRecordReader 
> which get parquet column data by the batch way. We construct parquet file 
> with different row count and column count, the column data type is Int32, the 
> maximum int value is 127 which satisfies bit pack encode with bit width=7,   
> the count of the row is from 10k to 100 million and the count of the column 
> is from 1 to 4.
> !image-2022-06-15-22-57-15-964.png|width=453,height=229!
> !image-2022-06-15-22-58-01-442.png|width=439,height=217!
> !image-2022-06-15-22-58-40-704.png|width=415,height=208!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] huaxingao commented on pull request #975: PARQUET-2157: add bloom filter fpp config

2022-06-16 Thread GitBox


huaxingao commented on PR #975:
URL: https://github.com/apache/parquet-mr/pull/975#issuecomment-1157762209

   > it should be good enough to also check the lower limit, eg exist > 
totalCount * (testFpp[i] * 0.9) , or exist > totalCount * (testFpp[i] * 0.5) , 
or even exist > 0. What do you think? This way, we'll be certain the test 
passes not because exist is just 0.
   
   Thanks for the suggestion! I can't find a reliable number for the lower 
limit. I put `exist > 0`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2157) Add BloomFilter fpp config

2022-06-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555118#comment-17555118
 ] 

ASF GitHub Bot commented on PARQUET-2157:
-

huaxingao commented on PR #975:
URL: https://github.com/apache/parquet-mr/pull/975#issuecomment-1157762209

   > it should be good enough to also check the lower limit, eg exist > 
totalCount * (testFpp[i] * 0.9) , or exist > totalCount * (testFpp[i] * 0.5) , 
or even exist > 0. What do you think? This way, we'll be certain the test 
passes not because exist is just 0.
   
   Thanks for the suggestion! I can't find a reliable number for the lower 
limit. I put `exist > 0`. 




> Add BloomFilter fpp config
> --
>
> Key: PARQUET-2157
> URL: https://issues.apache.org/jira/browse/PARQUET-2157
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Huaxin Gao
>Priority: Major
>
> Currently parquet-mr hardcoded bloom filter fpp (false positive probability) 
> to 0.01.  We should have a config to let user to specify fpp.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2159) Parquet bit-packing de/encode optimization

2022-06-16 Thread Fang-Xie (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555116#comment-17555116
 ] 

Fang-Xie commented on PARQUET-2159:
---

We implemented Parquet bit packing en/decode using JDK Vector API which can get 
benefits from SIMD instruction, refer to [https://openjdk.org/jeps/417]

 The original bit-packing code is generated from 
[https://github.com/apache/parquet-mr/blob/master/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java]

during maven build. We reimplemented bit-pack functions

but it had to compile and run under JDK 16 or higher (needs Vector APIs), So 
not sure we can contribute code to Parquet community.

 

 

 

> Parquet bit-packing de/encode optimization
> --
>
> Key: PARQUET-2159
> URL: https://issues.apache.org/jira/browse/PARQUET-2159
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Fang-Xie
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: image-2022-06-15-22-56-08-396.png, 
> image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, 
> image-2022-06-15-22-58-40-704.png
>
>
> Current Spark use Parquet-mr as parquet reader/writer library, but the 
> built-in bit-packing en/decode is not efficient enough. 
> Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector 
> in Open JDK18 brings prominent performance improvement.
> Due to Vector API is added to OpenJDK since 16, So this optimization request 
> JDK16 or higher.
> *Below are our test results*
> Functional test is based on open-source parquet-mr Bit-pack decoding 
> function: *_public final void unpack8Values(final byte[] in, final int inPos, 
> final int[] out, final int outPos)_* __
> compared with our implementation with vector API *_public final void 
> unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final 
> int outPos)_*
> We tested 10 pairs (open source parquet bit unpacking vs ours optimized 
> vectorized SIMD implementation) decode function with bit 
> width=\{1,2,3,4,5,6,7,8,9,10}, below are test results:
> !image-2022-06-15-22-56-08-396.png|width=437,height=223!
> We integrated our bit-packing decode implementation into parquet-mr, tested 
> the parquet batch reader ability from Spark VectorizedParquetRecordReader 
> which get parquet column data by the batch way. We construct parquet file 
> with different row count and column count, the column data type is Int32, the 
> maximum int value is 127 which satisfies bit pack encode with bit width=7,   
> the count of the row is from 10k to 100 million and the count of the column 
> is from 1 to 4.
> !image-2022-06-15-22-57-15-964.png|width=453,height=229!
> !image-2022-06-15-22-58-01-442.png|width=439,height=217!
> !image-2022-06-15-22-58-40-704.png|width=415,height=208!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2157) Add BloomFilter fpp config

2022-06-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555115#comment-17555115
 ] 

ASF GitHub Bot commented on PARQUET-2157:
-

huaxingao commented on code in PR #975:
URL: https://github.com/apache/parquet-mr/pull/975#discussion_r899177750


##
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java:
##
@@ -282,6 +286,63 @@ public void testParquetFileWithBloomFilter() throws 
IOException {
 }
   }
 
+  @Test
+  public void testParquetFileWithBloomFilterWithFpp() throws IOException {
+int totalCount = 10;
+double[] testFpp = {0.005, 0.01, 0.05, 0.10, 0.15, 0.20, 0.25};
+
+Set distinctStrings = new HashSet<>();
+while (distinctStrings.size() < totalCount) {
+  String str = RandomStringUtils.randomAlphabetic(12);
+  distinctStrings.add(str);
+}
+
+MessageType schema = Types.buildMessage().
+  required(BINARY).as(stringType()).named("name").named("msg");
+
+Configuration conf = new Configuration();
+GroupWriteSupport.setSchema(schema, conf);
+
+GroupFactory factory = new SimpleGroupFactory(schema);
+for (int i = 0; i < testFpp.length; i++) {
+  File file = temp.newFile();
+  file.delete();
+  Path path = new Path(file.getAbsolutePath());
+  try (ParquetWriter writer = ExampleParquetWriter.builder(path)
+.withPageRowCountLimit(10)
+.withConf(conf)
+.withDictionaryEncoding(false)
+.withBloomFilterEnabled("name", true)
+.withBloomFilterNDV("name", totalCount)
+.withBloomFilterFPP("name", testFpp[i])
+.build()) {
+java.util.Iterator iterator = distinctStrings.iterator();
+while (iterator.hasNext()) {
+  writer.write(factory.newGroup().append("name", iterator.next()));
+}
+  }
+  distinctStrings.clear();
+
+  try (ParquetFileReader reader = 
ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration( {
+BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
+BloomFilter bloomFilter = 
reader.getBloomFilterDataReader(blockMetaData)
+  .readBloomFilter(blockMetaData.getColumns().get(0));
+
+// The exist counts the number of times FindHash returns true.
+int exist = 0;
+while (distinctStrings.size() < totalCount) {
+  String str = RandomStringUtils.randomAlphabetic(10);
+  if (distinctStrings.add(str) &&
+
bloomFilter.findHash(LongHashFunction.xx(0).hashBytes(Binary.fromString(str).toByteBuffer(
 {
+exist++;
+  }
+}
+// The exist should be less than totalCount * fpp. Add 10% here for 
error space.
+assertTrue(exist < totalCount * (testFpp[i] * 1.1));

Review Comment:
   Yes. Agree.





> Add BloomFilter fpp config
> --
>
> Key: PARQUET-2157
> URL: https://issues.apache.org/jira/browse/PARQUET-2157
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Huaxin Gao
>Priority: Major
>
> Currently parquet-mr hardcoded bloom filter fpp (false positive probability) 
> to 0.01.  We should have a config to let user to specify fpp.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] huaxingao commented on a diff in pull request #975: PARQUET-2157: add bloom filter fpp config

2022-06-16 Thread GitBox


huaxingao commented on code in PR #975:
URL: https://github.com/apache/parquet-mr/pull/975#discussion_r899177750


##
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java:
##
@@ -282,6 +286,63 @@ public void testParquetFileWithBloomFilter() throws 
IOException {
 }
   }
 
+  @Test
+  public void testParquetFileWithBloomFilterWithFpp() throws IOException {
+int totalCount = 10;
+double[] testFpp = {0.005, 0.01, 0.05, 0.10, 0.15, 0.20, 0.25};
+
+Set distinctStrings = new HashSet<>();
+while (distinctStrings.size() < totalCount) {
+  String str = RandomStringUtils.randomAlphabetic(12);
+  distinctStrings.add(str);
+}
+
+MessageType schema = Types.buildMessage().
+  required(BINARY).as(stringType()).named("name").named("msg");
+
+Configuration conf = new Configuration();
+GroupWriteSupport.setSchema(schema, conf);
+
+GroupFactory factory = new SimpleGroupFactory(schema);
+for (int i = 0; i < testFpp.length; i++) {
+  File file = temp.newFile();
+  file.delete();
+  Path path = new Path(file.getAbsolutePath());
+  try (ParquetWriter writer = ExampleParquetWriter.builder(path)
+.withPageRowCountLimit(10)
+.withConf(conf)
+.withDictionaryEncoding(false)
+.withBloomFilterEnabled("name", true)
+.withBloomFilterNDV("name", totalCount)
+.withBloomFilterFPP("name", testFpp[i])
+.build()) {
+java.util.Iterator iterator = distinctStrings.iterator();
+while (iterator.hasNext()) {
+  writer.write(factory.newGroup().append("name", iterator.next()));
+}
+  }
+  distinctStrings.clear();
+
+  try (ParquetFileReader reader = 
ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration( {
+BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
+BloomFilter bloomFilter = 
reader.getBloomFilterDataReader(blockMetaData)
+  .readBloomFilter(blockMetaData.getColumns().get(0));
+
+// The exist counts the number of times FindHash returns true.
+int exist = 0;
+while (distinctStrings.size() < totalCount) {
+  String str = RandomStringUtils.randomAlphabetic(10);
+  if (distinctStrings.add(str) &&
+
bloomFilter.findHash(LongHashFunction.xx(0).hashBytes(Binary.fromString(str).toByteBuffer(
 {
+exist++;
+  }
+}
+// The exist should be less than totalCount * fpp. Add 10% here for 
error space.
+assertTrue(exist < totalCount * (testFpp[i] * 1.1));

Review Comment:
   Yes. Agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2159) Parquet bit-packing de/encode optimization

2022-06-16 Thread Timothy Miller (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555099#comment-17555099
 ] 

Timothy Miller commented on PARQUET-2159:
-

I frequently wish Java had a preprocessor like C++ that would solve your 
problem. Currently, we have to build ParquetMR with Java 8, and plenty of 
things that depend on it (like Trino and Presto) use Java 11 at the latest. 
There are some solutions involving runtime loading of class files (e.g. 
[https://stackoverflow.com/questions/4526113/java-conditional-compilation-how-to-prevent-code-chunks-from-being-compiled),]
 but there's already enough weirdness in the ParquetMR build process (e.g. 
compile-time generated code that makes debugging a huge pain) that I hesitate 
to suggest making it even more challenging.

Where actually is the code coming from that you're working on? Is it part of 
ParquetMR or some other library? I seem to recall that when debugging Trino, 
the code you're working on has to go through the decompiler in IntelliJ. So if 
it's already external in some way, then it probably wouldn't hurt to make it 
just a bit more dynamic, where ParquetMR loads different versions of the 
bit-packing library depending on the Java version.

> Parquet bit-packing de/encode optimization
> --
>
> Key: PARQUET-2159
> URL: https://issues.apache.org/jira/browse/PARQUET-2159
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Fang-Xie
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: image-2022-06-15-22-56-08-396.png, 
> image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, 
> image-2022-06-15-22-58-40-704.png
>
>
> Current Spark use Parquet-mr as parquet reader/writer library, but the 
> built-in bit-packing en/decode is not efficient enough. 
> Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector 
> in Open JDK18 brings prominent performance improvement.
> Due to Vector API is added to OpenJDK since 16, So this optimization request 
> JDK16 or higher.
> *Below are our test results*
> Functional test is based on open-source parquet-mr Bit-pack decoding 
> function: *_public final void unpack8Values(final byte[] in, final int inPos, 
> final int[] out, final int outPos)_* __
> compared with our implementation with vector API *_public final void 
> unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final 
> int outPos)_*
> We tested 10 pairs (open source parquet bit unpacking vs ours optimized 
> vectorized SIMD implementation) decode function with bit 
> width=\{1,2,3,4,5,6,7,8,9,10}, below are test results:
> !image-2022-06-15-22-56-08-396.png|width=437,height=223!
> We integrated our bit-packing decode implementation into parquet-mr, tested 
> the parquet batch reader ability from Spark VectorizedParquetRecordReader 
> which get parquet column data by the batch way. We construct parquet file 
> with different row count and column count, the column data type is Int32, the 
> maximum int value is 127 which satisfies bit pack encode with bit width=7,   
> the count of the row is from 10k to 100 million and the count of the column 
> is from 1 to 4.
> !image-2022-06-15-22-57-15-964.png|width=453,height=229!
> !image-2022-06-15-22-58-01-442.png|width=439,height=217!
> !image-2022-06-15-22-58-40-704.png|width=415,height=208!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-06-16 Thread Yujiang Zhong (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yujiang Zhong updated PARQUET-2160:
---
Description: 
The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
ran into OOM cause high off-heap usage. I think the reason is that the GC is 
not timely and causes off-heap memory fragmentation. I had to set  lower 
MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. There 
is a 
[thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
 of this zstd parquet issus in Iceberg community slack:  some people had the 
same problem. 

I think we can close decompressed stream mannually in time to solve this 
problem:

 

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.from(is, uncompressedSize);

->

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, 
uncompressedSize));

is.close();

 

After I made this change to decompress, I found off-heap memory is 
significantly reduced (with same query on same data).

  was:
The decompressed stream now relies on the JVM GC to close. When reading parquet 
in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. 
I think the reason is that the GC is not timely and causes off-heap memory 
fragmentation. I had to set  lower MALLOC_TRIM_THRESHOLD_ to make glibc give 
back memory to system quickly. There is a 
[thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
 of this zstd parquet issus in Iceberg community slack:  some people had the 
same problem. 

I think we can close decompressed stream mannually in time to solve this 
problem:

 

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.from(is, uncompressedSize);

->

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, 
uncompressedSize));

is.close();

 

After I made this change to decompress, I found off-heap memory is 
significantly reduced (with same query on same data).


> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think we can close decompressed stream mannually in time to solve this 
> problem:
>  
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize);
> ->
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, 
> uncompressedSize));
> is.close();
>  
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-06-16 Thread Yujiang Zhong (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555082#comment-17555082
 ] 

Yujiang Zhong commented on PARQUET-2160:


[~shangxinli] [~dongjoon] Can you please take a look at this?

> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream now relies on the JVM GC to close. When reading 
> parquet in zstd compressed format, sometimes I ran into OOM cause high 
> off-heap usage. I think the reason is that the GC is not timely and causes 
> off-heap memory fragmentation. I had to set  lower MALLOC_TRIM_THRESHOLD_ to 
> make glibc give back memory to system quickly. There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think we can close decompressed stream mannually in time to solve this 
> problem:
>  
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize);
> ->
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, 
> uncompressedSize));
> is.close();
>  
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-06-16 Thread Yujiang Zhong (Jira)
Yujiang Zhong created PARQUET-2160:
--

 Summary: Close decompression stream to free off-heap memory in time
 Key: PARQUET-2160
 URL: https://issues.apache.org/jira/browse/PARQUET-2160
 Project: Parquet
  Issue Type: Improvement
 Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
1.4.9.1 + glibc
Reporter: Yujiang Zhong


The decompressed stream now relies on the JVM GC to close. When reading parquet 
in zstd compressed format, sometimes I ran into OOM cause high off-heap usage. 
I think the reason is that the GC is not timely and causes off-heap memory 
fragmentation. I had to set  lower MALLOC_TRIM_THRESHOLD_ to make glibc give 
back memory to system quickly. There is a 
[thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
 of this zstd parquet issus in Iceberg community slack:  some people had the 
same problem. 

I think we can close decompressed stream mannually in time to solve this 
problem:

 

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.from(is, uncompressedSize);

->

InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

decompressed = BytesInput.{_}copy{_}(BytesInput.{_}from{_}(is, 
uncompressedSize));

is.close();

 

After I made this change to decompress, I found off-heap memory is 
significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2159) Parquet bit-packing de/encode optimization

2022-06-16 Thread Fang-Xie (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555081#comment-17555081
 ] 

Fang-Xie commented on PARQUET-2159:
---

Thanks [~theosib-amazon], these improvements depend on Vector API in JDK 16 or 
higher version, how do we consider about JDK version issue? 

> Parquet bit-packing de/encode optimization
> --
>
> Key: PARQUET-2159
> URL: https://issues.apache.org/jira/browse/PARQUET-2159
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Fang-Xie
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: image-2022-06-15-22-56-08-396.png, 
> image-2022-06-15-22-57-15-964.png, image-2022-06-15-22-58-01-442.png, 
> image-2022-06-15-22-58-40-704.png
>
>
> Current Spark use Parquet-mr as parquet reader/writer library, but the 
> built-in bit-packing en/decode is not efficient enough. 
> Our optimization for Parquet bit-packing en/decode with jdk.incubator.vector 
> in Open JDK18 brings prominent performance improvement.
> Due to Vector API is added to OpenJDK since 16, So this optimization request 
> JDK16 or higher.
> *Below are our test results*
> Functional test is based on open-source parquet-mr Bit-pack decoding 
> function: *_public final void unpack8Values(final byte[] in, final int inPos, 
> final int[] out, final int outPos)_* __
> compared with our implementation with vector API *_public final void 
> unpack8Values_vec(final byte[] in, final int inPos, final int[] out, final 
> int outPos)_*
> We tested 10 pairs (open source parquet bit unpacking vs ours optimized 
> vectorized SIMD implementation) decode function with bit 
> width=\{1,2,3,4,5,6,7,8,9,10}, below are test results:
> !image-2022-06-15-22-56-08-396.png|width=437,height=223!
> We integrated our bit-packing decode implementation into parquet-mr, tested 
> the parquet batch reader ability from Spark VectorizedParquetRecordReader 
> which get parquet column data by the batch way. We construct parquet file 
> with different row count and column count, the column data type is Int32, the 
> maximum int value is 127 which satisfies bit pack encode with bit width=7,   
> the count of the row is from 10k to 100 million and the count of the column 
> is from 1 to 4.
> !image-2022-06-15-22-57-15-964.png|width=453,height=229!
> !image-2022-06-15-22-58-01-442.png|width=439,height=217!
> !image-2022-06-15-22-58-40-704.png|width=415,height=208!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (PARQUET-2051) AvroWriteSupport does not pass Configuration to AvroSchemaConverter on Creation

2022-06-16 Thread Andreas Hailu (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-2051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andreas Hailu updated PARQUET-2051:
---
Fix Version/s: 1.12.3

> AvroWriteSupport does not pass Configuration to AvroSchemaConverter on 
> Creation
> ---
>
> Key: PARQUET-2051
> URL: https://issues.apache.org/jira/browse/PARQUET-2051
> Project: Parquet
>  Issue Type: Bug
>Reporter: Andreas Hailu
>Assignee: Andreas Hailu
>Priority: Major
> Fix For: 1.12.3
>
>
> Because of this, we're unable to fully leverage the ThreeLevelListWriter 
> functionality when trying to write Avro lists out using Parquet through the 
> AvroParquetOutputFormat.
> The following record is used for testing:
>  Schema:
> { "type": "record", "name": "NullLists", "namespace": "com.test", "fields": [ 
> \{ "name": "KeyID", "type": "string" }, \{ "name": "NullableList", "type": [ 
> "null", { "type": "array", "items": [ "null", "string" ] } ], "default": null 
> } ] }
> Record (using basic JSON just for display purposes):
> { "KeyID": "0", "NullableList": [ "foo", null, "baz" ] }
> During testing, we see the following exception:
> {quote}{{Caused by: java.lang.ClassCastException: repeated binary array 
> (STRING) is not a group}}
>  \{{ at org.apache.parquet.schema.Type.asGroupType(Type.java:250)}}
>  \{{ at 
> org.apache.parquet.avro.AvroWriteSupport$ThreeLevelListWriter.writeCollection(AvroWriteSupport.java:612)}}
>  \{{ at 
> org.apache.parquet.avro.AvroWriteSupport$ListWriter.writeList(AvroWriteSupport.java:397)}}
>  \{{ at 
> org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:355)}}
>  \{{ at 
> org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)}}
>  \{{ at 
> org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)}}
>  \{{ at 
> org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)}}
>  \{{ at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128}}
> {quote}
> Upon review, it was found that the configuration option that was set in 
> AvroWriteSupport for the ThreeLevelListWriter, 
> parquet.avro.write-old-list-structure being set to false, was never shared 
> with the AvroSchemaConverter.
> Once we made this change and tested locally, we observe the record with nulls 
> in the array being successfully written by AvroParquetOutputFormat. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PARQUET-2157) Add BloomFilter fpp config

2022-06-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555041#comment-17555041
 ] 

ASF GitHub Bot commented on PARQUET-2157:
-

ggershinsky commented on PR #975:
URL: https://github.com/apache/parquet-mr/pull/975#issuecomment-1157577513

   > The test takes about 2300 milli seconds on my laptop.
   
   Ok, this is reasonable. If this time is sufficient for reliably testing the 
upper limit of FPPs, it should be good enough to also check the lower limit, eg 
`exist > totalCount * (testFpp[i] * 0.9)` , or `exist > totalCount * 
(testFpp[i] * 0.5)` , or even `exist > 0`. What do you think? This way, we'll 
be certain the test passes not because `exist` is just 0.




> Add BloomFilter fpp config
> --
>
> Key: PARQUET-2157
> URL: https://issues.apache.org/jira/browse/PARQUET-2157
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-mr
>Reporter: Huaxin Gao
>Priority: Major
>
> Currently parquet-mr hardcoded bloom filter fpp (false positive probability) 
> to 0.01.  We should have a config to let user to specify fpp.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [parquet-mr] ggershinsky commented on pull request #975: PARQUET-2157: add bloom filter fpp config

2022-06-16 Thread GitBox


ggershinsky commented on PR #975:
URL: https://github.com/apache/parquet-mr/pull/975#issuecomment-1157577513

   > The test takes about 2300 milli seconds on my laptop.
   
   Ok, this is reasonable. If this time is sufficient for reliably testing the 
upper limit of FPPs, it should be good enough to also check the lower limit, eg 
`exist > totalCount * (testFpp[i] * 0.9)` , or `exist > totalCount * 
(testFpp[i] * 0.5)` , or even `exist > 0`. What do you think? This way, we'll 
be certain the test passes not because `exist` is just 0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org