squah-confluent commented on code in PR #21664:
URL: https://github.com/apache/kafka/pull/21664#discussion_r2911445756
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2067,34 +2067,41 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
// 4. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
// replaces an existing static member.
// The delta between the existing and the new target assignment is
persisted to the partition.
+ Optional<TasksTuple> updatedTargetAssignment = Optional.empty();
+ boolean initialDelayActive =
timer.isScheduled(streamsInitialRebalanceKey(groupId));
+ if (groupEpoch > group.assignmentEpoch() && !initialDelayActive) {
+ updatedTargetAssignment = maybeUpdateStreamsTargetAssignment(
+ group,
+ groupEpoch,
+ Optional.of(updatedMember),
+ updatedConfiguredTopology,
+ metadataImage,
+ records,
+ currentAssignmentConfigs
+ );
+ }
+
int targetAssignmentEpoch;
TasksTuple targetAssignment;
- if (groupEpoch > group.assignmentEpoch()) {
- boolean initialDelayActive =
timer.isScheduled(streamsInitialRebalanceKey(groupId));
- if (initialDelayActive) {
- // During initial rebalance delay, return empty assignment to
first joining members.
- targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
- targetAssignment = TasksTuple.EMPTY;
+ if (updatedTargetAssignment.isPresent()) {
+ targetAssignmentEpoch = groupEpoch;
+ targetAssignment = updatedTargetAssignment.get();
+ } else if (!initialDelayActive && group.assignmentEpoch() > 0) {
+ targetAssignmentEpoch = group.assignmentEpoch();
+ targetAssignment =
group.targetAssignment(updatedMember.memberId());
+ } else {
+ if (group.isEmpty() || initialDelayActive) {
returnedStatus.add(
new Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
.setStatusDetail("Assignment delayed due to the
configured initial rebalance delay.")
Review Comment:
After applying
https://github.com/apache/kafka/pull/21664#discussion_r2905108360 it no longer
makes sense to add the status in this PR.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3937,7 +3956,7 @@ private Assignment updateTargetAssignment(
* @param records The list to accumulate any new records.
* @return The new target assignment for the updated member, or EMPTY if
no member specified.
Review Comment:
Thanks for the review.
After applying
https://github.com/apache/kafka/pull/21664#discussion_r2905108360 I'm not sure
it makes sense to update the `return` javadocs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]