squah-confluent commented on code in PR #19967:
URL: https://github.com/apache/kafka/pull/19967#discussion_r2307416991


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -52,6 +54,10 @@ public void updateGroupConfig(String groupId, Properties 
newGroupConfig) {
             throw new InvalidRequestException("Group name can't be empty.");
         }
 
+        if (groupConfigListener != null) {
+            groupConfigListener.onGroupConfigUpdate(groupId, newGroupConfig);
+        }
+

Review Comment:
   `newGroupConfig` contains the entire set of config overrides for the group, 
while the config listener expects only the changed values. This means we'll 
trigger spurious rebalances when a group has a 
`STREAMS_NUM_STANDBY_REPLICAS_CONFIG` config override and _any_ additional 
config override is added or updated for that group.
   
   We could try to compute the diff between `newConfig` below and the existing 
config to address this.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -52,6 +54,10 @@ public void updateGroupConfig(String groupId, Properties 
newGroupConfig) {
             throw new InvalidRequestException("Group name can't be empty.");
         }
 
+        if (groupConfigListener != null) {
+            groupConfigListener.onGroupConfigUpdate(groupId, newGroupConfig);
+        }
+

Review Comment:
   `GroupConfigManager.updateGroupConfig` is called:
   * On broker startup, when all group config overrides are replayed.
   * When new group config overrides are configured.
   
   The broker startup call is problematic because it can trigger spurious 
rebalances depending on how quickly the group coordinator loads. I can't see a 
good solution to this at the moment.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1913,6 +1914,11 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         // to persist the change, and bump the group epoch later.
         boolean bumpGroupEpoch = hasStreamsMemberMetadataChanged(groupId, 
member, updatedMember, records);
 
+        // Check if StreamsGroup has flag rebalanceRequired equals to true
+        if (group.rebalanceRequired()) {
+            bumpGroupEpoch = true;
+        }
+

Review Comment:
   Instead of adding a new flag, we can bump the epoch directly when a config 
of interest changes and this will trigger a new assignment on the next 
heartbeat. We do this in `handleRegularExpressionsResult` to trigger a new 
assignment on the next heartbeat.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -52,6 +54,10 @@ public void updateGroupConfig(String groupId, Properties 
newGroupConfig) {
             throw new InvalidRequestException("Group name can't be empty.");
         }
 
+        if (groupConfigListener != null) {
+            groupConfigListener.onGroupConfigUpdate(groupId, newGroupConfig);

Review Comment:
   If `GroupCoordinatorService.onGroupConfigUpdate` throws an exception when 
not active, we will not update `configMap` below.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -2212,6 +2212,15 @@ public void onNewMetadataImage(
         runtime.onNewMetadataImage(newImage, delta);
     }
 
+    /**
+     * See {@link GroupCoordinator#onGroupConfigUpdate(String, Properties)}.
+     */
+    @Override
+    public void onGroupConfigUpdate(String groupId, Properties 
updatedProperties) {
+        throwIfNotActive();
+        runtime.onGroupConfigUpdate(groupId, updatedProperties);
+    }

Review Comment:
   We can avoid changing the coordinator runtime entirely. A group is owned by 
a single shard/partition and we can get that partition using 
`topicPartitionFor(groupId)`.
   
   You can look at the `streamsGroupHeartbeat` implementation for an example.
   ```java
   runtime.scheduleWriteOperation(
       ...,
       topicPartitionFor(groupId),
       ...,
       coordinator -> coordinator.onGroupConfigUpdate(groupId, 
updatedProperties)
   );
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -36,6 +36,8 @@ public class GroupConfigManager implements AutoCloseable {
 
     private final Map<String, GroupConfig> configMap;
 
+    private volatile GroupCoordinator groupConfigListener;

Review Comment:
   Since we don't need the entire set of `GroupCoordinator` methods, I would 
propose defining an inner interface and have `GroupCoordinatorService` 
implement it:
   ```java
       public interface Listener {
           void onGroupConfigUpdate(String groupId, Properties 
updatedProperties);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to