mjsax commented on code in PR #21526:
URL: https://github.com/apache/kafka/pull/21526#discussion_r2843895949
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1127,6 +1141,73 @@ public void setEndpointInformationEpoch(int
endpointInformationEpoch) {
this.endpointInformationEpoch = endpointInformationEpoch;
}
+ // Visible for testing
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
getCachedEndpointToPartitions(
Review Comment:
```suggestion
Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
cachedEndpointToPartitions(
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1127,6 +1141,73 @@ public void setEndpointInformationEpoch(int
endpointInformationEpoch) {
this.endpointInformationEpoch = endpointInformationEpoch;
}
+ // Visible for testing
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
getCachedEndpointToPartitions(
+ String memberId
+ ) {
+ return Optional.ofNullable(endpointToPartitionsCache.get(memberId));
+ }
+
+ // Visible for testing
+ void cacheEndpointToPartitions(
+ String memberId,
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions
+ ) {
+ endpointToPartitionsCache.put(memberId, endpointToPartitions);
+ }
+
+ private void clearEndpointToPartitionsCache() {
+ endpointToPartitionsCache.clear();
+ }
+
+ /**
+ * Builds the endpoint-to-partitions list for all members, using the cache
where possible.
+ *
+ * @param updatedMember The member that was just updated (may have a stale
entry in the members map).
+ * @param metadataImage The current metadata image for resolving topic
partitions.
+ * @return The list of endpoint-to-partitions mappings for all members
with endpoints.
+ */
+ public List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
buildEndpointToPartitions(
+ StreamsGroupMember updatedMember,
+ CoordinatorMetadataImage metadataImage
+ ) {
+ List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
Review Comment:
Would it be simpler to start this method with
```
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
if (updatedMember == null) {
return endpointToPartitionsList;
}
```
This should allow us to remove `updatedMember != null` in the body, making
it easier to read the code?
But looking into the code, it seems `updateMember` should never be `null`,
so why do we handle this case to begin with? If we want a guard, should we
rather throw an exception (not sure if this might be too harsh crashing the
GC...) or at least surface this error somehow else (log an ERROR log, with the
request to file a JIRA ticket), to surface the problem, and to avoid hidden
bugs?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1127,6 +1141,73 @@ public void setEndpointInformationEpoch(int
endpointInformationEpoch) {
this.endpointInformationEpoch = endpointInformationEpoch;
}
+ // Visible for testing
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
getCachedEndpointToPartitions(
+ String memberId
+ ) {
+ return Optional.ofNullable(endpointToPartitionsCache.get(memberId));
+ }
+
+ // Visible for testing
+ void cacheEndpointToPartitions(
+ String memberId,
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions
+ ) {
+ endpointToPartitionsCache.put(memberId, endpointToPartitions);
+ }
+
+ private void clearEndpointToPartitionsCache() {
Review Comment:
Why do we need a method for this one-lines, especially as it's `private` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]