[jira] [Comment Edited] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-08-05 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575830#comment-17575830
 ] 

Adam Binford edited comment on PARQUET-2160 at 8/5/22 1:11 PM:
---

{quote}Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 
{quote}
Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have a very wide table (1k+ columns), that are mostly strings (not 
sure if that makes a difference), so it's probably creating a _lot_ of 
{{{}ZstdInputStream{}}}'s when reading all of the columns. Selecting only some 
of the columns isn't as noticeable, but still slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generating GBs of off heap memory that 
never got cleaned up (and dozens of GB of virtual memory), now consistently 
stays around ~100MB. I also agree looking at {{BytesInput}} that no extra copy 
of the actual data is made using {{{}BytesInput.copy{}}}, because either way 
the data will be loaded into a single {{byte[]}} at some point, albeit a little 
earlier with the copy method. Only overhead is creating the additional 
{{BytesInput}} java object.


was (Author: kimahriman):
{quote}Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 
{quote}
Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have an very wide table (1k+ columns), that are mostly strings (not 
sure if that makes a difference), so it's probably creating a _lot_ of 
{{{}ZstdInputStream{}}}'s when reading all of the columns. Selecting only some 
of the columns isn't as noticeable, but still slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generating GBs of off heap memory that 
never got cleaned up (and dozens of GB of virtual memory), now consistently 
stays around ~100MB. I also agree looking at {{BytesInput}} that no extra copy 
of the actual data is made using {{{}BytesInput.copy{}}}, because either way 
the data will be loaded into a single {{byte[]}} at some point, albeit a little 
earlier with the copy method. Only overhead is creating the additional 
{{BytesInput}} java object.

> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-08-05 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575830#comment-17575830
 ] 

Adam Binford edited comment on PARQUET-2160 at 8/5/22 1:08 PM:
---

{quote}Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 
{quote}
Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have an very wide table (1k+ columns), that are mostly strings (not 
sure if that makes a difference), so it's probably creating a _lot_ of 
{{{}ZstdInputStream{}}}'s when reading all of the columns. Selecting only some 
of the columns isn't as noticeable, but still slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generating GBs of off heap memory that 
never got cleaned up (and dozens of GB of virtual memory), now consistently 
stays around ~100MB. I also agree looking at {{BytesInput}} that no extra copy 
of the actual data is made using {{{}BytesInput.copy{}}}, because either way 
the data will be loaded into a single {{byte[]}} at some point, albeit a little 
earlier with the copy method. Only overhead is creating the additional 
{{BytesInput}} java object.


was (Author: kimahriman):
{quote}Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 
{quote}
Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have an very wide table (1k+ columns), that are mostly strings, so 
it's probably creating a _lot_ of {{{}ZstdInputStream{}}}'s when reading all of 
the columns. Selecting only some of the columns isn't as noticeable, but still 
slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generating GBs of off heap memory that 
never got cleaned up (and dozens of GB of virtual memory), now consistently 
stays around ~100MB. I also agree looking at {{BytesInput}} that no extra copy 
of the actual data is made using {{{}BytesInput.copy{}}}, because either way 
the data will be loaded into a single {{byte[]}} at some point, albeit a little 
earlier with the copy method. Only overhead is creating the additional 
{{BytesInput}} java object.

> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-08-05 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575830#comment-17575830
 ] 

Adam Binford edited comment on PARQUET-2160 at 8/5/22 1:08 PM:
---

{quote}Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 
{quote}
Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have an very wide table (1k+ columns), that are mostly strings, so 
it's probably creating a _lot_ of {{{}ZstdInputStream{}}}'s when reading all of 
the columns. Selecting only some of the columns isn't as noticeable, but still 
slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generating GBs of off heap memory that 
never got cleaned up (and dozens of GB of virtual memory), now consistently 
stays around ~100MB. I also agree looking at {{BytesInput}} that no extra copy 
of the actual data is made using {{{}BytesInput.copy{}}}, because either way 
the data will be loaded into a single {{byte[]}} at some point, albeit a little 
earlier with the copy method. Only overhead is creating the additional 
{{BytesInput}} java object.


was (Author: kimahriman):
{quote}Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 
{quote}
Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have an very wide table (1k+ columns), that are mostly strings, so 
it's probably creating a _lot_ of {{{}ZstdInputStream{}}}'s when reading all of 
the columns. Selecting only some of the columns isn't as noticeable, but still 
slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generating GBs of off heap memory that 
never got cleaned up (and dozens of GB of virtual memory), now consistently 
stays around ~100MB. I also agree looking at {{BytesInput}} that no extra copy 
of the actual data is made using {{{}BytesInput.copy{}}}, because either way 
the data will be loaded into a single {{byte[]}} at some point, albeit a little 
earlier with the copy method. Only overhead is creating the additional 
`BytesInput` java object.

> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-08-05 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575830#comment-17575830
 ] 

Adam Binford edited comment on PARQUET-2160 at 8/5/22 1:07 PM:
---

{quote}Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 
{quote}
Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have an very wide table (1k+ columns), that are mostly strings, so 
it's probably creating a _lot_ of {{{}ZstdInputStream{}}}'s when reading all of 
the columns. Selecting only some of the columns isn't as noticeable, but still 
slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generating GBs of off heap memory that 
never got cleaned up (and dozens of GB of virtual memory), now consistently 
stays around ~100MB. I also agree looking at {{BytesInput}} that no extra copy 
of the actual data is made using {{{}BytesInput.copy{}}}, because either way 
the data will be loaded into a single {{byte[]}} at some point, albeit a little 
earlier with the copy method. Only overhead is creating the additional 
`BytesInput` java object.


was (Author: kimahriman):
{quote}Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 
{quote}
Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have an very wide table (1k+ columns), that are mostly strings, so 
it's probably creating a _lot_ of {{{}ZstdInputStream{}}}'s when reading all of 
the columns. Selecting only some of the columns isn't as noticeable, but still 
slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generated GBs of off heap memory that never 
got cleaned up (and dozens of GB of virtual memory), now consistently stays 
around ~100MB. I also agree looking at {{BytesInput}} that no extra copy of the 
actual data is made using {{{}BytesInput.copy{}}}, because either way the data 
will be loaded into a single {{byte[]}} at some point, albeit a little earlier 
with the copy method. Only overhead is creating the additional `BytesInput` 
java object.

> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-08-05 Thread Adam Binford (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575830#comment-17575830
 ] 

Adam Binford edited comment on PARQUET-2160 at 8/5/22 1:05 PM:
---

{quote}Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 
{quote}
Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have an very wide table (1k+ columns), that are mostly strings, so 
it's probably creating a _lot_ of {{{}ZstdInputStream{}}}'s when reading all of 
the columns. Selecting only some of the columns isn't as noticeable, but still 
slowly grows over time.

I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generated GBs of off heap memory that never 
got cleaned up (and dozens of GB of virtual memory), now consistently stays 
around ~100MB. I also agree looking at {{BytesInput}} that no extra copy of the 
actual data is made using {{{}BytesInput.copy{}}}, because either way the data 
will be loaded into a single {{byte[]}} at some point, albeit a little earlier 
with the copy method. Only overhead is creating the additional `BytesInput` 
java object.


was (Author: kimahriman):
??Which parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. ??

Yeah this is in Spark 3.3.0 so Parquet 1.12.2. It looks like 
[https://github.com/apache/parquet-mr/pull/889] made it into 1.12.2, so the 
buffer pool is the only main difference. I tried dropping in 1.12.3, and 
enabling the buffer pool in 1.12.2, and both still exhibit the same issue. The 
reason I can generate so much off heap usage (> 1GB in a few seconds), is 
because I have an very wide table (1k+ columns), that are mostly strings, so 
it's probably creating a _lot_ of `ZstdInputStream`'s when reading all of the 
columns. Selecting only some of the columns isn't as noticeable, but still 
slowly grows over time.


I compiled this suggested fix myself and tested it out and it did in fact 
completely fix my problem. What was generated GBs of off heap memory that never 
got cleaned up (and dozens of GB of virtual memory), now consistently stays 
around ~100MB. I also agree looking at `BytesInput` that no extra copy of the 
actual data is made using `BytesInput.copy`, because either way the data will 
be loaded into a single `byte[]` at some point, albeit a little earlier with 
the copy method. Only overhead is creating the additional `BytesInput` java 
object.

> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-08-05 Thread Yujiang Zhong (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575528#comment-17575528
 ] 

Yujiang Zhong edited comment on PARQUET-2160 at 8/5/22 5:59 AM:


[~Kimahriman] I didn't recreate this with pure spark parquet table,  Which 
parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 

 

 


was (Author: JIRAUSER291005):
[~Kimahriman] I didn't recreate this with pure spark parquet table,  which 
parquet version you're using, there are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 

 

 

> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-08-05 Thread Yujiang Zhong (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575528#comment-17575528
 ] 

Yujiang Zhong edited comment on PARQUET-2160 at 8/5/22 6:00 AM:


[~Kimahriman] I didn't recreate this with pure spark parquet table. Which 
parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 

 

 


was (Author: JIRAUSER291005):
[~Kimahriman] I didn't recreate this with pure spark parquet table,  Which 
parquet version you're using? There are some fix 
patchs([https://github.com/apache/parquet-mr/pull/903] and 
[https://github.com/apache/parquet-mr/pull/889]) released in 1.12.3. 

 

 

> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (PARQUET-2160) Close decompression stream to free off-heap memory in time

2022-08-04 Thread Yujiang Zhong (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575529#comment-17575529
 ] 

Yujiang Zhong edited comment on PARQUET-2160 at 8/5/22 2:50 AM:


{quote}What about query performance? was it affected too since it now incurs an 
extra copy.
{quote}
[~csun] 

It does not add an extra copy actually,  BytesInput#copy  reads bytes from the 
input stream and constructs a ByteArrayBytesInput as return,  these bytes will 
be wrapped as ByteByffer later to construct a ByteBufferInputStream. This is 
the same as the current process, just that bytes are read in advance. I had run 
some queries to test (not strict benchmark) and the results show that there is 
no performance loss.


was (Author: JIRAUSER291005):
??What about query performance? was it affected too since it now incurs an 
extra copy.??

[~csun] 

It does not add an extra copy actually,  BytesInput#copy  reads bytes from the 
input stream and constructs a ByteArrayBytesInput as return,  these bytes will 
be wrapped as ByteByffer later to construct a ByteBufferInputStream. This is 
the same as the current process, just that bytes are read in advance. I had run 
some queries to test (not strict benchmark) and the results show that there is 
no performance loss.

> Close decompression stream to free off-heap memory in time
> --
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
>  Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni 
> 1.4.9.1 + glibc
>Reporter: Yujiang Zhong
>Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the 
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I 
> ran into OOM cause high off-heap usage. I think the reason is that the GC is 
> not timely and causes off-heap memory fragmentation. I had to set  lower 
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly. 
> There is a 
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789=C025PH0G1D4]]
>  of this zstd parquet issus in Iceberg community slack:  some people had the 
> same problem. 
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and 
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is 
> significantly reduced (with same query on same data).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)