bbejeck commented on code in PR #18278:
URL: https://github.com/apache/kafka/pull/18278#discussion_r1911583193
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2378,6 +2400,32 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
return new CoordinatorResult<>(records, new
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
}
+ private void addToEndpointToPartitions(Set<Map.Entry<String,
Set<Integer>>> taskEntrySet,
+ StreamsGroup group,
+
StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint,
+
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList) {
+ for (Map.Entry<String, Set<Integer>> taskEntry : taskEntrySet) {
+ String subtopologyId = taskEntry.getKey();
+ List<Integer> partitions = new ArrayList<>(taskEntry.getValue());
+ ConfiguredSubtopology configuredSubtopology =
group.configuredTopology().subtopologies().get(subtopologyId);
+ if (configuredSubtopology != null) {
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitions = Stream.concat(
Review Comment:
Done. The logic for calculating the `TaskPartitions` has been encapsulated
in the `EndpointToPartitionsManager` class (name is definitely up for debate)
and there's a unit test with it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]