squah-confluent commented on code in PR #21727:
URL: https://github.com/apache/kafka/pull/21727#discussion_r3259101874
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java:
##########
@@ -297,22 +310,118 @@ public TargetAssignmentResult build() throws
TaskAssignorException {
memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x,
x -> MemberAssignment.empty())));
}
+ return newGroupAssignment;
+ }
+
+ /**
+ * Builds the records for the new target assignment, when the set of
members and static members
+ * have not changed since the assignment was built.
+ *
+ * @param newGroupAssignment The new target assignment.
+ * @return A TargetAssignmentResult which contains the records to update
+ * the existing target assignment.
+ */
+ public TargetAssignmentResult buildRecords(GroupAssignment
newGroupAssignment) {
+ return buildRecords(newGroupAssignment, Optional.empty(),
Optional.empty());
+ }
+
+ /**
+ * Builds the records for the new target assignment, when the set of
members and static members
+ * may have changed since the assignment was built.
+ *
+ * @param newGroupAssignment The new target assignment.
+ * @param currentMemberIds The current set of member ids.
+ * @param currentStaticMembers The current static members.
+ * @return A TargetAssignmentResult which contains the records to update
+ * the existing target assignment.
+ */
+ public TargetAssignmentResult buildRecords(
+ GroupAssignment newGroupAssignment,
+ Set<String> currentMemberIds,
+ Map<String, String> currentStaticMembers
+ ) {
+ return buildRecords(newGroupAssignment, Optional.of(currentMemberIds),
Optional.of(currentStaticMembers));
+ }
+
+ /**
+ * Builds the records for the new target assignment.
+ *
+ * @param newGroupAssignment The new target assignment.
+ * @param currentMemberIds The current set of member ids, if they may
have changed since the assignment was built.
+ * @param currentStaticMembers The current static members, if they may
have changed since the assignment was built.
+ * @return A TargetAssignmentResult which contains the records to update
+ * the existing target assignment.
+ */
+ public TargetAssignmentResult buildRecords(
+ GroupAssignment newGroupAssignment,
+ Optional<Set<String>> currentMemberIds,
+ Optional<Map<String, String>> currentStaticMembers
+ ) {
+ Set<String> memberIds = new HashSet<>(members.keySet());
Review Comment:
> 1. We add a freezeInputs() method to the builder. All the deep copying
logic can live in this single method.
> 2. We capture the thread ID in the builder's constructor. In build###(),
if the thread id does not match, we throw an IllegalStateException if
freezeInputs() was not called. This prevents most misuse while allowing the
non-offloaded path to avoid copies.
> 3. We add a test that counts with###() methods and compares it against a
hardcoded value. When a new with###() method is added, the test fails and
instructs the developer to update freezeInputs() and update the expected count.
I think the safety-proofing would go best in a follow up PR. I'd like to
keep these PRs on the smaller side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]