ableegoldman commented on a change in pull request #10609: URL: https://github.com/apache/kafka/pull/10609#discussion_r636561268
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ########## @@ -125,6 +130,29 @@ public int errorCode() { return data.errorCode(); } + // For version > MIN_NAMED_TOPOLOGY_VERSION + private void setTaskOffsetSumDataWithNamedTopologiesFromTaskOffsetSumMap(final Map<TaskId, Long> taskOffsetSums) { + final Map<Integer, List<SubscriptionInfoData.PartitionToOffsetSum>> topicGroupIdToPartitionOffsetSum = new HashMap<>(); + for (final Map.Entry<TaskId, Long> taskEntry : taskOffsetSums.entrySet()) { + final TaskId task = taskEntry.getKey(); + topicGroupIdToPartitionOffsetSum.computeIfAbsent(task.topicGroupId, t -> new ArrayList<>()).add( + new SubscriptionInfoData.PartitionToOffsetSum() + .setPartition(task.partition) + .setOffsetSum(taskEntry.getValue())); + } + + data.setTaskOffsetSums(taskOffsetSums.entrySet().stream().map(t -> { + final SubscriptionInfoData.TaskOffsetSum taskOffsetSum = new SubscriptionInfoData.TaskOffsetSum(); + final TaskId task = t.getKey(); + taskOffsetSum.setTopicGroupId(task.topicGroupId); + taskOffsetSum.setPartition(task.partition); Review comment: Ah, yes. I tried to explain this with a comment on the `SubscriptionInfoData.json` schema but I'll call it out again in the `SubscriptionInfo.java` class. Previously we encoded the offset sums as a nested "map" of <topicGroupId, <partition, offsetSum>>, where the "map" is really an array and the array struct does not itself allow for struct types. It's just a gap in the API that no one has cared or had time to close, not a fundamental principle. Anyways this meant we had a TopicGroupId and a PartitionToOffsetSum struct, where in turn the PartitionToOffsetSum was composed of the partition and offset sum base types. I guess this was reasonable enough when there were only 3 base fields, but if we wanted to maintain this nested array structure it would mean adding more and more nested structs each time we added a field. I felt this would get to be too complicated and annoying to deal with so I flattened the OffsetSum struct out to just include each base field directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org