squah-confluent commented on code in PR #21664:
URL: https://github.com/apache/kafka/pull/21664#discussion_r2911445756


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2067,34 +2067,41 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         // 4. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
         // replaces an existing static member.
         // The delta between the existing and the new target assignment is 
persisted to the partition.
+        Optional<TasksTuple> updatedTargetAssignment = Optional.empty();
+        boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(groupId));
+        if (groupEpoch > group.assignmentEpoch() && !initialDelayActive) {
+            updatedTargetAssignment = maybeUpdateStreamsTargetAssignment(
+                group,
+                groupEpoch,
+                Optional.of(updatedMember),
+                updatedConfiguredTopology,
+                metadataImage,
+                records,
+                currentAssignmentConfigs
+            );
+        }
+
         int targetAssignmentEpoch;
         TasksTuple targetAssignment;
-        if (groupEpoch > group.assignmentEpoch()) {
-            boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(groupId));
-            if (initialDelayActive) {
-                // During initial rebalance delay, return empty assignment to 
first joining members.
-                targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
-                targetAssignment = TasksTuple.EMPTY;
+        if (updatedTargetAssignment.isPresent()) {
+            targetAssignmentEpoch = groupEpoch;
+            targetAssignment = updatedTargetAssignment.get();
+        } else if (!initialDelayActive && group.assignmentEpoch() > 0) {
+            targetAssignmentEpoch = group.assignmentEpoch();
+            targetAssignment = 
group.targetAssignment(updatedMember.memberId());
+        } else {
 
+            if (group.isEmpty() || initialDelayActive) {
                 returnedStatus.add(
                     new Status()
                         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
                         .setStatusDetail("Assignment delayed due to the 
configured initial rebalance delay.")

Review Comment:
   After applying 
https://github.com/apache/kafka/pull/21664#discussion_r2905108360 it no longer 
makes sense to add the status in this PR.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3937,7 +3956,7 @@ private Assignment updateTargetAssignment(
      * @param records              The list to accumulate any new records.
      * @return The new target assignment for the updated member, or EMPTY if 
no member specified.

Review Comment:
   Thanks for the review.
   
   After applying 
https://github.com/apache/kafka/pull/21664#discussion_r2905108360 I'm not sure 
it makes sense to update the `return` javadocs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to