Copilot commented on code in PR #21526:
URL: https://github.com/apache/kafka/pull/21526#discussion_r2833721722
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -433,6 +445,7 @@ public void updateMember(StreamsGroupMember newMember) {
maybeUpdateTaskProcessId(oldMember, newMember);
updateStaticMember(newMember);
maybeUpdateGroupState();
+ endpointToPartitionsCache.remove(newMember.memberId());
Review Comment:
The cache is being invalidated on every member update, but according to the
PR description, it should only be invalidated when a member's assigned tasks
change. This overly aggressive invalidation defeats the purpose of the cache,
as the cache will be cleared even when only non-task-related member fields are
updated (e.g., member epoch, heartbeat timestamp, etc.). Consider using
StreamsGroupMember.hasAssignedTasksChanged(oldMember, newMember) to
conditionally invalidate the cache only when tasks actually change.
```suggestion
if (oldMember == null ||
StreamsGroupMember.hasAssignedTasksChanged(oldMember, newMember)) {
endpointToPartitionsCache.remove(newMember.memberId());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]