[jira] [Updated] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2024-01-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-12605:
--
Component/s: clients

> kafka consumer churns through buffer memory iterating over records
> --
>
> Key: KAFKA-12605
> URL: https://issues.apache.org/jira/browse/KAFKA-12605
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: Radai Rosenblatt
>Priority: Major
> Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png
>
>
> we recently conducted analysis on memory allocations by the kafka consumer 
> and found a significant amount of buffers that graduate out of the young gen 
> causing GC load.
>  
> these are tthe buffers used to gunzip record batches in the consumer when 
> polling. since the same iterator (and underlying streams and buffers) are 
> likely to live through several poll() cycles these buffers graduate out of 
> young gen and cause issues.
>  
> see attached memory allocation flame graph:
> !Screen Shot 2021-04-01 at 3.55.47 PM.png!  
> the code causing this is in CompressionTypye.GZIP (taken from current trunk):
> {code:java}
> @Override
> public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
> BufferSupplier decompressionBufferSupplier) {
> try {
> // Set output buffer (uncompressed) to 16 KB (none by default) and 
> input buffer (compressed) to
> // 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
> where the caller reads a small
> // number of bytes (potentially a single byte)
> return new BufferedInputStream(new GZIPInputStream(new 
> ByteBufferInputStream(buffer), 8 * 1024),
> 16 * 1024);
> } catch (Exception e) {
> throw new KafkaException(e);
> }
> }{code}
> it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
> to attempt re-use.
>  
> i believe it is possible to actually get both tthose buffers from the 
> supplier, and return them when iteration over the record batch is done. 
> doing so will require subclassing  BufferedInputStream and GZIPInputStream 
> (or its parent class) to allow supplying external buffers onto them. also 
> some lifecycle hook would be needed to return said buffers to the pool when 
> iteration is done.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2021-04-01 Thread radai rosenblatt (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-12605:
-
Description: 
we recently conducted analysis on memory allocations by the kafka consumer and 
found a significant amount of buffers that graduate out of the young gen 
causing GC load.

 

these are tthe buffers used to gunzip record batches in the consumer when 
polling. since the same iterator (and underlying streams and buffers) are 
likely to live through several poll() cycles these buffers graduate out of 
young gen and cause issues.

 

see attached memory allocation flame graph:

!Screen Shot 2021-04-01 at 3.55.47 PM.png!  

the code causing this is in CompressionTypye.GZIP (taken from current trunk):
{code:java}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) and 
input buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
where the caller reads a small
// number of bytes (potentially a single byte)
return new BufferedInputStream(new GZIPInputStream(new 
ByteBufferInputStream(buffer), 8 * 1024),
16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}{code}
it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
to attempt re-use.

 

i believe it is possible to actually get both tthose buffers from the supplier, 
and return them when iteration over the record batch is done. 

doing so will require subclassing  BufferedInputStream and GZIPInputStream (or 
its parent class) to allow supplying external buffers onto them. also some 
lifecycle hook would be needed to return said buffers to the pool when 
iteration is done.

 

  was:
we recently conducted analysis on memory allocations by the kafka consumer and 
found a significant amount of buffers that graduate out of the young gen 
causing GC load.

 

these are tthe buffers used to gunzip record batches in the consumer when 
polling. since the same iterator (and underlying streams and buffers) are 
likely to live through several poll() cycles these buffers graduate out of 
young gen and cause issues.

 

see attached memory allocation flame graph.

 

the code causing this is in CompressionTypye.GZIP (taken from current trunk):
{code:java}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) and 
input buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
where the caller reads a small
// number of bytes (potentially a single byte)
return new BufferedInputStream(new GZIPInputStream(new 
ByteBufferInputStream(buffer), 8 * 1024),
16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}{code}
it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
to attempt re-use.

 

i believe it is possible to actually get both tthose buffers from the supplier, 
and return them when iteration over the record batch is done. 

doing so will require subclassing  BufferedInputStream and GZIPInputStream (or 
its parent class) to allow supplying external buffers onto them. also some 
lifecycle hook would be needed to return said buffers to the pool when 
iteration is done.

 


> kafka consumer churns through buffer memory iterating over records
> --
>
> Key: KAFKA-12605
> URL: https://issues.apache.org/jira/browse/KAFKA-12605
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: radai rosenblatt
>Priority: Major
> Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png
>
>
> we recently conducted analysis on memory allocations by the kafka consumer 
> and found a significant amount of buffers that graduate out of the young gen 
> causing GC load.
>  
> these are tthe buffers used to gunzip record batches in the consumer when 
> polling. since the same iterator (and underlying streams and buffers) are 
> likely to live through several poll() cycles these buffers graduate out of 
> young gen and cause issues.
>  
> see attached memory allocation flame graph:
> !Screen Shot 2021-04-01 at 3.55.47 PM.png!  
> the code causing this is in CompressionTypye.GZIP (taken from current trunk):
> {code:java}
> @Override
> public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
> BufferSupplier decompressionBufferSupplier) {
> try {
> // Set output buffer (uncompressed) to 16 KB (none by default) and 
> inp

[jira] [Updated] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2021-04-01 Thread radai rosenblatt (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-12605:
-
Attachment: Screen Shot 2021-04-01 at 3.55.47 PM.png

> kafka consumer churns through buffer memory iterating over records
> --
>
> Key: KAFKA-12605
> URL: https://issues.apache.org/jira/browse/KAFKA-12605
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: radai rosenblatt
>Priority: Major
> Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png
>
>
> we recently conducted analysis on memory allocations by the kafka consumer 
> and found a significant amount of buffers that graduate out of the young gen 
> causing GC load.
>  
> these are tthe buffers used to gunzip record batches in the consumer when 
> polling. since the same iterator (and underlying streams and buffers) are 
> likely to live through several poll() cycles these buffers graduate out of 
> young gen and cause issues.
>  
> see attached memory allocation flame graph.
>  
> the code causing this is in CompressionTypye.GZIP (taken from current trunk):
> {code:java}
> @Override
> public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
> BufferSupplier decompressionBufferSupplier) {
> try {
> // Set output buffer (uncompressed) to 16 KB (none by default) and 
> input buffer (compressed) to
> // 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
> where the caller reads a small
> // number of bytes (potentially a single byte)
> return new BufferedInputStream(new GZIPInputStream(new 
> ByteBufferInputStream(buffer), 8 * 1024),
> 16 * 1024);
> } catch (Exception e) {
> throw new KafkaException(e);
> }
> }{code}
> it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
> to attempt re-use.
>  
> i believe it is possible to actually get both tthose buffers from the 
> supplier, and return them when iteration over the record batch is done. 
> doing so will require subclassing  BufferedInputStream and GZIPInputStream 
> (or its parent class) to allow supplying external buffers onto them. also 
> some lifecycle hook would be needed to return said buffers to the pool when 
> iteration is done.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)