[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687763#comment-16687763
 ] 

Stanislav Kozlovski commented on KAFKA-7610:
--------------------------------------------

Hey everybody,

The static membership KIP is very good and would certainly de-duplicate the 
consumers in this issue. What I'm concerned with is dealing with this issue in 
V3 and below requests - non-static memberships.

I believe we should have a definable bound on what the broker will handle as to 
properly protect against buggy or even malicious clients.

1) propagating disconnects - I somehow like this solution as it gives a lot 
more control and clarity to the handler. Are there any downsides to this I 
might be missing? I guess we will be making the request handlers busier since 
they will now have to handle a new event - disconnections.
2) return earlier - This gives us a bound on how many consumers can join by the 
session timeout. Is that the `group.max.session.timeout.ms` config you are 
referring to? The default is 5 minutes which I feel is too high - a 
sufficiently buggy/malicious application can still introduce memory pressure. I 
also think that it is not ideal to use a session timeout to bound broker memory 
usage. It would make you trade off consumer (client) flexibility for something 
the server itself should handle.

I think the simplest way to avoid this issue is to outright place a bound on 
how much the broker will accept. Some examples I can come up with:
a) in the form of time - we have `group.initial.rebalance.delay.ms` which can 
get infinitely restarted. Maybe we could have a 
`group.initial.rebalance.max.delay.ms` which would serve as a maximum amount of 
time we can delay the initial rebalance. Or we could use a multiplier - e.g 
allow only 3x `group.initial.rebalance.delay.ms`
b) in the form of memory
c) in the form of consumer count - `group.max.size`

My thoughts on those:
a) it is not strict enough
b) it is not intuitive
c) seems to be intuitive and strict. In the current model, large consumer 
groups are unstable because rebalances tend to be disruptive and long-lived. 
Users are not sure to know this and sometimes create large consumer groups only 
to come back and bite them. I think providing documentation and a proper 
default value would alleviate some of these problems, at the very least 
certainly educate people.
With KIP-345's static membership, that should be mitigated and large consumer 
groups should more approachable. But since it is common to have a newer broker 
version and older clients, I think it would still prove as a useful config to 
guard against dynamic membership.

Thoughts on this?

> Detect consumer failures in initial JoinGroup
> ---------------------------------------------
>
>                 Key: KAFKA-7610
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7610
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to