[jira] [Commented] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records
[ https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17337897#comment-17337897 ] radai rosenblatt commented on KAFKA-12605: -- PR files against trunk - https://github.com/apache/kafka/pull/10624 > kafka consumer churns through buffer memory iterating over records > -- > > Key: KAFKA-12605 > URL: https://issues.apache.org/jira/browse/KAFKA-12605 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: radai rosenblatt >Priority: Major > Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png > > > we recently conducted analysis on memory allocations by the kafka consumer > and found a significant amount of buffers that graduate out of the young gen > causing GC load. > > these are tthe buffers used to gunzip record batches in the consumer when > polling. since the same iterator (and underlying streams and buffers) are > likely to live through several poll() cycles these buffers graduate out of > young gen and cause issues. > > see attached memory allocation flame graph: > !Screen Shot 2021-04-01 at 3.55.47 PM.png! > the code causing this is in CompressionTypye.GZIP (taken from current trunk): > {code:java} > @Override > public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, > BufferSupplier decompressionBufferSupplier) { > try { > // Set output buffer (uncompressed) to 16 KB (none by default) and > input buffer (compressed) to > // 8 KB (0.5 KB by default) to ensure reasonable performance in cases > where the caller reads a small > // number of bytes (potentially a single byte) > return new BufferedInputStream(new GZIPInputStream(new > ByteBufferInputStream(buffer), 8 * 1024), > 16 * 1024); > } catch (Exception e) { > throw new KafkaException(e); > } > }{code} > it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available > to attempt re-use. > > i believe it is possible to actually get both tthose buffers from the > supplier, and return them when iteration over the record batch is done. > doing so will require subclassing BufferedInputStream and GZIPInputStream > (or its parent class) to allow supplying external buffers onto them. also > some lifecycle hook would be needed to return said buffers to the pool when > iteration is done. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records
[ https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314040#comment-17314040 ] Ismael Juma commented on KAFKA-12605: - Zstd and snappy recycle buffers by themselves. Gzip is the only one with the gap. As you said, it requires subclassing or some such to make that work. > kafka consumer churns through buffer memory iterating over records > -- > > Key: KAFKA-12605 > URL: https://issues.apache.org/jira/browse/KAFKA-12605 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: radai rosenblatt >Priority: Major > Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png > > > we recently conducted analysis on memory allocations by the kafka consumer > and found a significant amount of buffers that graduate out of the young gen > causing GC load. > > these are tthe buffers used to gunzip record batches in the consumer when > polling. since the same iterator (and underlying streams and buffers) are > likely to live through several poll() cycles these buffers graduate out of > young gen and cause issues. > > see attached memory allocation flame graph: > !Screen Shot 2021-04-01 at 3.55.47 PM.png! > the code causing this is in CompressionTypye.GZIP (taken from current trunk): > {code:java} > @Override > public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, > BufferSupplier decompressionBufferSupplier) { > try { > // Set output buffer (uncompressed) to 16 KB (none by default) and > input buffer (compressed) to > // 8 KB (0.5 KB by default) to ensure reasonable performance in cases > where the caller reads a small > // number of bytes (potentially a single byte) > return new BufferedInputStream(new GZIPInputStream(new > ByteBufferInputStream(buffer), 8 * 1024), > 16 * 1024); > } catch (Exception e) { > throw new KafkaException(e); > } > }{code} > it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available > to attempt re-use. > > i believe it is possible to actually get both tthose buffers from the > supplier, and return them when iteration over the record batch is done. > doing so will require subclassing BufferedInputStream and GZIPInputStream > (or its parent class) to allow supplying external buffers onto them. also > some lifecycle hook would be needed to return said buffers to the pool when > iteration is done. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records
[ https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314037#comment-17314037 ] radai rosenblatt commented on KAFKA-12605: -- only compression type that uses this mechanism looks to be LZ4. a very similar solution can be done for gzip > kafka consumer churns through buffer memory iterating over records > -- > > Key: KAFKA-12605 > URL: https://issues.apache.org/jira/browse/KAFKA-12605 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: radai rosenblatt >Priority: Major > Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png > > > we recently conducted analysis on memory allocations by the kafka consumer > and found a significant amount of buffers that graduate out of the young gen > causing GC load. > > these are tthe buffers used to gunzip record batches in the consumer when > polling. since the same iterator (and underlying streams and buffers) are > likely to live through several poll() cycles these buffers graduate out of > young gen and cause issues. > > see attached memory allocation flame graph: > !Screen Shot 2021-04-01 at 3.55.47 PM.png! > the code causing this is in CompressionTypye.GZIP (taken from current trunk): > {code:java} > @Override > public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, > BufferSupplier decompressionBufferSupplier) { > try { > // Set output buffer (uncompressed) to 16 KB (none by default) and > input buffer (compressed) to > // 8 KB (0.5 KB by default) to ensure reasonable performance in cases > where the caller reads a small > // number of bytes (potentially a single byte) > return new BufferedInputStream(new GZIPInputStream(new > ByteBufferInputStream(buffer), 8 * 1024), > 16 * 1024); > } catch (Exception e) { > throw new KafkaException(e); > } > }{code} > it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available > to attempt re-use. > > i believe it is possible to actually get both tthose buffers from the > supplier, and return them when iteration over the record batch is done. > doing so will require subclassing BufferedInputStream and GZIPInputStream > (or its parent class) to allow supplying external buffers onto them. also > some lifecycle hook would be needed to return said buffers to the pool when > iteration is done. > -- This message was sent by Atlassian Jira (v8.3.4#803005)