lucasbru commented on code in PR #21664:
URL: https://github.com/apache/kafka/pull/21664#discussion_r2904596421


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2067,34 +2067,41 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         // 4. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
         // replaces an existing static member.
         // The delta between the existing and the new target assignment is 
persisted to the partition.
+        Optional<TasksTuple> updatedTargetAssignment = Optional.empty();
+        boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(groupId));
+        if (groupEpoch > group.assignmentEpoch() && !initialDelayActive) {
+            updatedTargetAssignment = maybeUpdateStreamsTargetAssignment(
+                group,
+                groupEpoch,
+                Optional.of(updatedMember),
+                updatedConfiguredTopology,
+                metadataImage,
+                records,
+                currentAssignmentConfigs
+            );
+        }
+
         int targetAssignmentEpoch;
         TasksTuple targetAssignment;
-        if (groupEpoch > group.assignmentEpoch()) {
-            boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(groupId));
-            if (initialDelayActive) {
-                // During initial rebalance delay, return empty assignment to 
first joining members.
-                targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
-                targetAssignment = TasksTuple.EMPTY;
+        if (updatedTargetAssignment.isPresent()) {
+            targetAssignmentEpoch = groupEpoch;
+            targetAssignment = updatedTargetAssignment.get();
+        } else if (!initialDelayActive && group.assignmentEpoch() > 0) {
+            targetAssignmentEpoch = group.assignmentEpoch();
+            targetAssignment = 
group.targetAssignment(updatedMember.memberId());
+        } else {
 
+            if (group.isEmpty() || initialDelayActive) {
                 returnedStatus.add(
                     new Status()
                         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
                         .setStatusDetail("Assignment delayed due to the 
configured initial rebalance delay.")

Review Comment:
   I think we can also get here also if the initial assignment is offloaded. 
Should we reflect this in the status detail?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3937,7 +3956,7 @@ private Assignment updateTargetAssignment(
      * @param records              The list to accumulate any new records.
      * @return The new target assignment for the updated member, or EMPTY if 
no member specified.

Review Comment:
   nit: do we not want to update the `return` javadocs? I suppose this goes to 
the next PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to