squah-confluent commented on code in PR #19967: URL: https://github.com/apache/kafka/pull/19967#discussion_r2307416991
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java: ########## @@ -52,6 +54,10 @@ public void updateGroupConfig(String groupId, Properties newGroupConfig) { throw new InvalidRequestException("Group name can't be empty."); } + if (groupConfigListener != null) { + groupConfigListener.onGroupConfigUpdate(groupId, newGroupConfig); + } + Review Comment: `newGroupConfig` contains the entire set of config overrides for the group, while the config listener expects only the changed values. This means we'll trigger spurious rebalances when a group has a `STREAMS_NUM_STANDBY_REPLICAS_CONFIG` config override and _any_ additional config override is added or updated for that group. We could try to compute the diff between `newConfig` below and the existing config to address this. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java: ########## @@ -52,6 +54,10 @@ public void updateGroupConfig(String groupId, Properties newGroupConfig) { throw new InvalidRequestException("Group name can't be empty."); } + if (groupConfigListener != null) { + groupConfigListener.onGroupConfigUpdate(groupId, newGroupConfig); + } + Review Comment: `GroupConfigManager.updateGroupConfig` is called: * On broker startup, when all group config overrides are replayed. * When new group config overrides are configured. The broker startup call is problematic because it can trigger spurious rebalances depending on how quickly the group coordinator loads. I can't see a good solution to this at the moment. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1913,6 +1914,11 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream // to persist the change, and bump the group epoch later. boolean bumpGroupEpoch = hasStreamsMemberMetadataChanged(groupId, member, updatedMember, records); + // Check if StreamsGroup has flag rebalanceRequired equals to true + if (group.rebalanceRequired()) { + bumpGroupEpoch = true; + } + Review Comment: Instead of adding a new flag, we can bump the epoch directly when a config of interest changes and this will trigger a new assignment on the next heartbeat. We do this in `handleRegularExpressionsResult` to trigger a new assignment on the next heartbeat. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java: ########## @@ -52,6 +54,10 @@ public void updateGroupConfig(String groupId, Properties newGroupConfig) { throw new InvalidRequestException("Group name can't be empty."); } + if (groupConfigListener != null) { + groupConfigListener.onGroupConfigUpdate(groupId, newGroupConfig); Review Comment: If `GroupCoordinatorService.onGroupConfigUpdate` throws an exception when not active, we will not update `configMap` below. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -2212,6 +2212,15 @@ public void onNewMetadataImage( runtime.onNewMetadataImage(newImage, delta); } + /** + * See {@link GroupCoordinator#onGroupConfigUpdate(String, Properties)}. + */ + @Override + public void onGroupConfigUpdate(String groupId, Properties updatedProperties) { + throwIfNotActive(); + runtime.onGroupConfigUpdate(groupId, updatedProperties); + } Review Comment: We can avoid changing the coordinator runtime entirely. A group is owned by a single shard/partition and we can get that partition using `topicPartitionFor(groupId)`. You can look at the `streamsGroupHeartbeat` implementation for an example. ```java runtime.scheduleWriteOperation( ..., topicPartitionFor(groupId), ..., coordinator -> coordinator.onGroupConfigUpdate(groupId, updatedProperties) ); ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java: ########## @@ -36,6 +36,8 @@ public class GroupConfigManager implements AutoCloseable { private final Map<String, GroupConfig> configMap; + private volatile GroupCoordinator groupConfigListener; Review Comment: Since we don't need the entire set of `GroupCoordinator` methods, I would propose defining an inner interface and have `GroupCoordinatorService` implement it: ```java public interface Listener { void onGroupConfigUpdate(String groupId, Properties updatedProperties); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org