[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687763#comment-16687763 ]
Stanislav Kozlovski commented on KAFKA-7610: -------------------------------------------- Hey everybody, The static membership KIP is very good and would certainly de-duplicate the consumers in this issue. What I'm concerned with is dealing with this issue in V3 and below requests - non-static memberships. I believe we should have a definable bound on what the broker will handle as to properly protect against buggy or even malicious clients. 1) propagating disconnects - I somehow like this solution as it gives a lot more control and clarity to the handler. Are there any downsides to this I might be missing? I guess we will be making the request handlers busier since they will now have to handle a new event - disconnections. 2) return earlier - This gives us a bound on how many consumers can join by the session timeout. Is that the `group.max.session.timeout.ms` config you are referring to? The default is 5 minutes which I feel is too high - a sufficiently buggy/malicious application can still introduce memory pressure. I also think that it is not ideal to use a session timeout to bound broker memory usage. It would make you trade off consumer (client) flexibility for something the server itself should handle. I think the simplest way to avoid this issue is to outright place a bound on how much the broker will accept. Some examples I can come up with: a) in the form of time - we have `group.initial.rebalance.delay.ms` which can get infinitely restarted. Maybe we could have a `group.initial.rebalance.max.delay.ms` which would serve as a maximum amount of time we can delay the initial rebalance. Or we could use a multiplier - e.g allow only 3x `group.initial.rebalance.delay.ms` b) in the form of memory c) in the form of consumer count - `group.max.size` My thoughts on those: a) it is not strict enough b) it is not intuitive c) seems to be intuitive and strict. In the current model, large consumer groups are unstable because rebalances tend to be disruptive and long-lived. Users are not sure to know this and sometimes create large consumer groups only to come back and bite them. I think providing documentation and a proper default value would alleviate some of these problems, at the very least certainly educate people. With KIP-345's static membership, that should be mitigated and large consumer groups should more approachable. But since it is common to have a newer broker version and older clients, I think it would still prove as a useful config to guard against dynamic membership. Thoughts on this? > Detect consumer failures in initial JoinGroup > --------------------------------------------- > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement > Reporter: Jason Gustafson > Priority: Major > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)