squah-confluent commented on code in PR #21727:
URL: https://github.com/apache/kafka/pull/21727#discussion_r3258979486


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java:
##########
@@ -297,22 +310,118 @@ public TargetAssignmentResult build() throws 
TaskAssignorException {
                 memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x, 
x -> MemberAssignment.empty())));
         }
 
+        return newGroupAssignment;
+    }
+
+    /**
+     * Builds the records for the new target assignment, when the set of 
members and static members
+     * have not changed since the assignment was built.
+     *
+     * @param newGroupAssignment    The new target assignment.
+     * @return A TargetAssignmentResult which contains the records to update
+     *         the existing target assignment.
+     */
+    public TargetAssignmentResult buildRecords(GroupAssignment 
newGroupAssignment) {
+        return buildRecords(newGroupAssignment, Optional.empty(), 
Optional.empty());
+    }
+
+    /**
+     * Builds the records for the new target assignment, when the set of 
members and static members
+     * may have changed since the assignment was built.
+     *
+     * @param newGroupAssignment    The new target assignment.
+     * @param currentMemberIds      The current set of member ids.
+     * @param currentStaticMembers  The current static members.
+     * @return A TargetAssignmentResult which contains the records to update
+     *         the existing target assignment.
+     */
+    public TargetAssignmentResult buildRecords(
+        GroupAssignment newGroupAssignment,
+        Set<String> currentMemberIds,
+        Map<String, String> currentStaticMembers
+    ) {
+        return buildRecords(newGroupAssignment, Optional.of(currentMemberIds), 
Optional.of(currentStaticMembers));
+    }
+
+    /**
+     * Builds the records for the new target assignment.
+     *
+     * @param newGroupAssignment    The new target assignment.
+     * @param currentMemberIds      The current set of member ids, if they may 
have changed since the assignment was built.
+     * @param currentStaticMembers  The current static members, if they may 
have changed since the assignment was built.
+     * @return A TargetAssignmentResult which contains the records to update
+     *         the existing target assignment.
+     */
+    public TargetAssignmentResult buildRecords(
+        GroupAssignment newGroupAssignment,
+        Optional<Set<String>> currentMemberIds,
+        Optional<Map<String, String>> currentStaticMembers
+    ) {
+        Set<String> memberIds = new HashSet<>(members.keySet());

Review Comment:
   Thanks for the detailed explanation! I've been thinking about this and 
trying to come up with a good solution. I'm not too happy about unconditionally 
cloning inputs when not required on the non-offloaded path. I'm also not happy 
about adding an extra class.
   
   How about:
   1. We add a `freezeInputs()` method to the builder. All the deep copying 
logic can live in this single method.
   2. We capture the thread ID in the builder's constructor. In `build###()`, 
if the thread id does not match, we throw an `IllegalStateException` if 
`freezeInputs()` was not called. This prevents most misuse while allowing the 
non-offloaded path to avoid copies.
   3. We add a test that counts `with###()` methods and compares it against a 
hardcoded value. When a new `with###()` method is added, the test fails and 
instructs the developer to update `freezeInputs()` and update the expected 
count.
   
   An alternative to 3. would be to invoke all the `with###()` methods with 
reflection and then compare fields before and after `freezeInputs()`. But this 
becomes very annoying because we have to write handling for each field type 
(`Map`, `Optional`, record classes, etc).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to