lucasbru commented on code in PR #21526:
URL: https://github.com/apache/kafka/pull/21526#discussion_r2846008269
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1127,6 +1141,73 @@ public void setEndpointInformationEpoch(int
endpointInformationEpoch) {
this.endpointInformationEpoch = endpointInformationEpoch;
}
+ // Visible for testing
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
getCachedEndpointToPartitions(
+ String memberId
+ ) {
+ return Optional.ofNullable(endpointToPartitionsCache.get(memberId));
+ }
+
+ // Visible for testing
+ void cacheEndpointToPartitions(
+ String memberId,
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions
+ ) {
+ endpointToPartitionsCache.put(memberId, endpointToPartitions);
+ }
+
+ private void clearEndpointToPartitionsCache() {
+ endpointToPartitionsCache.clear();
+ }
+
+ /**
+ * Builds the endpoint-to-partitions list for all members, using the cache
where possible.
+ *
+ * @param updatedMember The member that was just updated (may have a stale
entry in the members map).
+ * @param metadataImage The current metadata image for resolving topic
partitions.
+ * @return The list of endpoint-to-partitions mappings for all members
with endpoints.
+ */
+ public List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
buildEndpointToPartitions(
+ StreamsGroupMember updatedMember,
+ CoordinatorMetadataImage metadataImage
+ ) {
+ List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
Review Comment:
Done
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1127,6 +1141,73 @@ public void setEndpointInformationEpoch(int
endpointInformationEpoch) {
this.endpointInformationEpoch = endpointInformationEpoch;
}
+ // Visible for testing
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
getCachedEndpointToPartitions(
+ String memberId
+ ) {
+ return Optional.ofNullable(endpointToPartitionsCache.get(memberId));
+ }
+
+ // Visible for testing
+ void cacheEndpointToPartitions(
+ String memberId,
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions
+ ) {
+ endpointToPartitionsCache.put(memberId, endpointToPartitions);
+ }
+
+ private void clearEndpointToPartitionsCache() {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]