[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17399395#comment-17399395 ]
Matthias J. Sax commented on KAFKA-13152: ----------------------------------------- The current level is a per-partition config. The want a per-client config, ie, the provided value must be distributed over all threads/tasks/partitions. Also note, that because threads can be added/removed dynamically, and the tasks (and thus partition) assignment may change during a rebalance, that we need to dynamically adjust the current limit per partition on any such event. We would pause the corresponding partition that exceeds it's "quota" (that is also what we already do right now), so nothing must be changed here. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > ----------------------------------------------------------------------- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Sagar Rao > Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in <byte[], byte[]>. -- This message was sent by Atlassian Jira (v8.3.4#803005)