lucasbru commented on code in PR #22245:
URL: https://github.com/apache/kafka/pull/22245#discussion_r3273126924
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1979,18 +2044,20 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus =
new ArrayList<>();
// Get or create the streams group.
- boolean isJoining = memberEpoch == 0;
+ boolean isJoining = memberEpoch ==
StreamsGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH;
StreamsGroup group;
if (isJoining) {
group = getOrCreateStreamsGroup(groupId, records);
- throwIfStreamsGroupIsFull(group);
+ throwIfStreamsGroupIsFull(group, instanceId);
} else {
group = getStreamsGroupOrThrow(groupId);
}
// Get or create the member.
StreamsGroupMember member;
+ StreamsGroupMember maybeOldMember;
if (instanceId == null) {
+ maybeOldMember = group.dynamicMember(memberId);
Review Comment:
I think maybeOldMember is redundant in the dynamic branch. It is consumed in
three places below: hasReplacedStaticMember (always false for dynamic - same
reference or null), hasUserEndpointChanged (gives the same result as passing
`member`, since for existing heartbeats they are the same reference and for
first joins `member` is a default with no endpoint), and
buildEndpointToPartitions (only relevant for the replaced static case). Could
we compute the replaced static member only inside the static branch and drop
this assignment + the dynamicMember accessor?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]