[jira] [Updated] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records
[ https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-12605: -- Component/s: clients > kafka consumer churns through buffer memory iterating over records > -- > > Key: KAFKA-12605 > URL: https://issues.apache.org/jira/browse/KAFKA-12605 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.7.0 >Reporter: Radai Rosenblatt >Priority: Major > Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png > > > we recently conducted analysis on memory allocations by the kafka consumer > and found a significant amount of buffers that graduate out of the young gen > causing GC load. > > these are tthe buffers used to gunzip record batches in the consumer when > polling. since the same iterator (and underlying streams and buffers) are > likely to live through several poll() cycles these buffers graduate out of > young gen and cause issues. > > see attached memory allocation flame graph: > !Screen Shot 2021-04-01 at 3.55.47 PM.png! > the code causing this is in CompressionTypye.GZIP (taken from current trunk): > {code:java} > @Override > public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, > BufferSupplier decompressionBufferSupplier) { > try { > // Set output buffer (uncompressed) to 16 KB (none by default) and > input buffer (compressed) to > // 8 KB (0.5 KB by default) to ensure reasonable performance in cases > where the caller reads a small > // number of bytes (potentially a single byte) > return new BufferedInputStream(new GZIPInputStream(new > ByteBufferInputStream(buffer), 8 * 1024), > 16 * 1024); > } catch (Exception e) { > throw new KafkaException(e); > } > }{code} > it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available > to attempt re-use. > > i believe it is possible to actually get both tthose buffers from the > supplier, and return them when iteration over the record batch is done. > doing so will require subclassing BufferedInputStream and GZIPInputStream > (or its parent class) to allow supplying external buffers onto them. also > some lifecycle hook would be needed to return said buffers to the pool when > iteration is done. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records
[ https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radai rosenblatt updated KAFKA-12605: - Description: we recently conducted analysis on memory allocations by the kafka consumer and found a significant amount of buffers that graduate out of the young gen causing GC load. these are tthe buffers used to gunzip record batches in the consumer when polling. since the same iterator (and underlying streams and buffers) are likely to live through several poll() cycles these buffers graduate out of young gen and cause issues. see attached memory allocation flame graph: !Screen Shot 2021-04-01 at 3.55.47 PM.png! the code causing this is in CompressionTypye.GZIP (taken from current trunk): {code:java} @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { try { // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small // number of bytes (potentially a single byte) return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024), 16 * 1024); } catch (Exception e) { throw new KafkaException(e); } }{code} it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available to attempt re-use. i believe it is possible to actually get both tthose buffers from the supplier, and return them when iteration over the record batch is done. doing so will require subclassing BufferedInputStream and GZIPInputStream (or its parent class) to allow supplying external buffers onto them. also some lifecycle hook would be needed to return said buffers to the pool when iteration is done. was: we recently conducted analysis on memory allocations by the kafka consumer and found a significant amount of buffers that graduate out of the young gen causing GC load. these are tthe buffers used to gunzip record batches in the consumer when polling. since the same iterator (and underlying streams and buffers) are likely to live through several poll() cycles these buffers graduate out of young gen and cause issues. see attached memory allocation flame graph. the code causing this is in CompressionTypye.GZIP (taken from current trunk): {code:java} @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { try { // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small // number of bytes (potentially a single byte) return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024), 16 * 1024); } catch (Exception e) { throw new KafkaException(e); } }{code} it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available to attempt re-use. i believe it is possible to actually get both tthose buffers from the supplier, and return them when iteration over the record batch is done. doing so will require subclassing BufferedInputStream and GZIPInputStream (or its parent class) to allow supplying external buffers onto them. also some lifecycle hook would be needed to return said buffers to the pool when iteration is done. > kafka consumer churns through buffer memory iterating over records > -- > > Key: KAFKA-12605 > URL: https://issues.apache.org/jira/browse/KAFKA-12605 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: radai rosenblatt >Priority: Major > Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png > > > we recently conducted analysis on memory allocations by the kafka consumer > and found a significant amount of buffers that graduate out of the young gen > causing GC load. > > these are tthe buffers used to gunzip record batches in the consumer when > polling. since the same iterator (and underlying streams and buffers) are > likely to live through several poll() cycles these buffers graduate out of > young gen and cause issues. > > see attached memory allocation flame graph: > !Screen Shot 2021-04-01 at 3.55.47 PM.png! > the code causing this is in CompressionTypye.GZIP (taken from current trunk): > {code:java} > @Override > public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, > BufferSupplier decompressionBufferSupplier) { > try { > // Set output buffer (uncompressed) to 16 KB (none by default) and > inp
[jira] [Updated] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records
[ https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radai rosenblatt updated KAFKA-12605: - Attachment: Screen Shot 2021-04-01 at 3.55.47 PM.png > kafka consumer churns through buffer memory iterating over records > -- > > Key: KAFKA-12605 > URL: https://issues.apache.org/jira/browse/KAFKA-12605 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: radai rosenblatt >Priority: Major > Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png > > > we recently conducted analysis on memory allocations by the kafka consumer > and found a significant amount of buffers that graduate out of the young gen > causing GC load. > > these are tthe buffers used to gunzip record batches in the consumer when > polling. since the same iterator (and underlying streams and buffers) are > likely to live through several poll() cycles these buffers graduate out of > young gen and cause issues. > > see attached memory allocation flame graph. > > the code causing this is in CompressionTypye.GZIP (taken from current trunk): > {code:java} > @Override > public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, > BufferSupplier decompressionBufferSupplier) { > try { > // Set output buffer (uncompressed) to 16 KB (none by default) and > input buffer (compressed) to > // 8 KB (0.5 KB by default) to ensure reasonable performance in cases > where the caller reads a small > // number of bytes (potentially a single byte) > return new BufferedInputStream(new GZIPInputStream(new > ByteBufferInputStream(buffer), 8 * 1024), > 16 * 1024); > } catch (Exception e) { > throw new KafkaException(e); > } > }{code} > it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available > to attempt re-use. > > i believe it is possible to actually get both tthose buffers from the > supplier, and return them when iteration over the record batch is done. > doing so will require subclassing BufferedInputStream and GZIPInputStream > (or its parent class) to allow supplying external buffers onto them. also > some lifecycle hook would be needed to return said buffers to the pool when > iteration is done. > -- This message was sent by Atlassian Jira (v8.3.4#803005)