[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400641#comment-17400641 ]
Guozhang Wang commented on KAFKA-13152: --------------------------------------- [~sagarrao] [~mjsax] (also cc [~desai.p.rohan]) I had a slightly different idea (probably a more complex one, just to admit) when filing the ticket. It is indeed a global config controlling the total number of bytes used for source partition record buffering, but it would not be distributed across all threads / tasks, instead we just monitor the aggregated total bytes across all tasks within the instance, when it has been reached, we can consider several options: 1) just pause all partitions; and then resume all partitions when it has dropped below the threshold. Not sure if it would result much "thrashing" on pausing / resuming, but since these actions are quite cheap anyways I'm not too worried about that. 2) pause some partitions, e.g. one heuristic is to pick the partition with most bytes; and then resume all paused partitions when it has dropped below the threshold. Personally I'm leaning towards 1) for now for simplicity, and we can consider if this is sufficient after observing its behavior in production later. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > ----------------------------------------------------------------------- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Sagar Rao > Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in <byte[], byte[]>. -- This message was sent by Atlassian Jira (v8.3.4#803005)