ableegoldman commented on a change in pull request #10609:
URL: https://github.com/apache/kafka/pull/10609#discussion_r636561268



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -125,6 +130,29 @@ public int errorCode() {
         return data.errorCode();
     }
 
+    // For version > MIN_NAMED_TOPOLOGY_VERSION
+    private void 
setTaskOffsetSumDataWithNamedTopologiesFromTaskOffsetSumMap(final Map<TaskId, 
Long> taskOffsetSums) {
+        final Map<Integer, List<SubscriptionInfoData.PartitionToOffsetSum>> 
topicGroupIdToPartitionOffsetSum = new HashMap<>();
+        for (final Map.Entry<TaskId, Long> taskEntry : 
taskOffsetSums.entrySet()) {
+            final TaskId task = taskEntry.getKey();
+            
topicGroupIdToPartitionOffsetSum.computeIfAbsent(task.topicGroupId, t -> new 
ArrayList<>()).add(
+                    new SubscriptionInfoData.PartitionToOffsetSum()
+                            .setPartition(task.partition)
+                            .setOffsetSum(taskEntry.getValue()));
+        }
+
+        data.setTaskOffsetSums(taskOffsetSums.entrySet().stream().map(t -> {
+            final SubscriptionInfoData.TaskOffsetSum taskOffsetSum = new 
SubscriptionInfoData.TaskOffsetSum();
+            final TaskId task = t.getKey();
+            taskOffsetSum.setTopicGroupId(task.topicGroupId);
+            taskOffsetSum.setPartition(task.partition);

Review comment:
       Ah, yes. I tried to explain this with a comment on the 
`SubscriptionInfoData.json` schema but I'll call it out again in the 
`SubscriptionInfo.java` class. Previously we encoded the offset sums as a 
nested "map" of <topicGroupId, <partition, offsetSum>>, where the "map" is 
really an array and the array struct does not itself allow for struct types. 
It's just a gap in the API that no one has cared or had time to close, not a 
fundamental principle. Anyways this meant we had a TopicGroupId and a 
PartitionToOffsetSum struct, where in turn the PartitionToOffsetSum was 
composed of the partition and offset sum base types.
   
   I guess this was reasonable enough when there were only 3 base fields, but 
if we wanted to maintain this nested array structure it would mean adding more 
and more nested structs each time we added a field. I felt this would get to be 
too complicated and annoying to deal with so I flattened the OffsetSum struct 
out to just include each base field directly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to