[
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842014#comment-17842014
]
Sal Sorrentino edited comment on KAFKA-16514 at 4/29/24 2:55 PM:
-----------------------------------------------------------------
Less tacky is better. Looking at other consumer client implementations (kgo -
Golang driver specifically), the process to leave a group is:
# The member leaving the group to send a rebalance request indicating its
partitions should be reassigned.
# The incremental rebalance process takes place, removing the partitions from
the appropriate member and assigning them to another.
# The member leaves the group via LeaveGroupRequest with it's member id as
it's payload (
LeaveGroupRequestMember)
If the prior suggestion of using an unsubscribe accomplishes this, that would
be the way to go, but I'm not sure that it does. Also, I don't believe the
current Close with leave=true does either. My hope is the abandoned partitions
would be assigned to the member with the most up to date replica of each
partition, but I can neither confirm not deny this ;) My assumption at the
moment is that the member leaves immediately regardless of the entire group
state...it does not wait for another member to build the state in the
background before leaving (my assumption could be wrong).
The problem I'm having is the scope-splosion of this bug.
Is there a reason we can't pass the member id to the leave group request in the
absence of a static member id?
{code:java}
String memberId = clientSupplier.consumer().groupMetadata().memberId(); {code}
was (Author: JIRAUSER305028):
Less tacky is better. Looking at other consumer client implementations (kgo -
Golang driver specifically), the process to leave a group is:
# The member leaving the group to send a rebalance request indicating its
partitions should be reassigned.
# The incremental rebalance process takes place, removing the partitions from
the appropriate member and assigning them to another.
# The member leaves the group via LeaveGroupRequest with it's member id as
it's payload (
LeaveGroupRequestMember)
If the prior suggestion of using an unsubscribe accomplishes this, that would
be the way to go, but I'm not sure that it does. Also, I don't believe the
current Close with leave=true does either. My hope is the abandoned partitions
would be assigned to the member with the most up to date replica of each
partition, but I can neither confirm not deny this ;)
The problem I'm having is the scope-splosion of this bug.
Is there a reason we can't pass the member id to the leave group request in the
absence of a static member id?
{code:java}
String memberId = clientSupplier.consumer().groupMetadata().memberId(); {code}
> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup
> flag.
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.7.0
> Reporter: Sal Sorrentino
> Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated
> consumer group, the supplied `leaveGroup` option seems to have no effect.
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave
> the group, immediately triggering a consumer group rebalance. In practice,
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any
> associated StateStores would be persisted to disk and that in the case of a
> rolling restart/deployment, the rebalance delay may be preferable. However,
> in our application we are using in-memory state stores and standby replicas.
> There is no benefit in delaying the rebalance in this setup and we are in
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)