bbejeck commented on code in PR #20486: URL: https://github.com/apache/kafka/pull/20486#discussion_r2330287941
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java: ########## @@ -1091,6 +1091,148 @@ public void shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() { assertEquals(numTasks, allStandbyTasks.size()); } + @Test + public void shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments() { + // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3 + // Node 2 has active tasks 2,3 and standby tasks 0,1 + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))), + mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3)))); + + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))), + mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1)))); + + // Node 3 joins as new client + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + + final Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))), + new TopologyDescriberImpl(4, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + final Set<Integer> allAssignedActiveTasks = new HashSet<>(); + allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member1")); + allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member2")); + allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member3")); + assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedActiveTasks); + + // Verify all standby tasks are assigned + final Set<Integer> allAssignedStandbyTasks = new HashSet<>(); + allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member1")); + allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member2")); + allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member3")); + assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedStandbyTasks); + + // Verify each member has 1-2 active tasks and at most 3 tasks total + assertTrue(getAllActiveTaskIds(result, "member1").size() >= 1 && getAllActiveTaskIds(result, "member1").size() <= 2); + assertTrue(getAllActiveTaskIds(result, "member1").size() + getAllStandbyTaskIds(result, "member1").size() <= 3); + + assertTrue(getAllActiveTaskIds(result, "member2").size() >= 1 && getAllActiveTaskIds(result, "member2").size() <= 2); + assertTrue(getAllActiveTaskIds(result, "member2").size() + getAllStandbyTaskIds(result, "member2").size() <= 3); + + assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 && getAllActiveTaskIds(result, "member3").size() <= 2); + assertTrue(getAllActiveTaskIds(result, "member3").size() + getAllStandbyTaskIds(result, "member3").size() <= 3); Review Comment: We're confirming the size or the number of tasks vs. the sub-topology where they are from but the test below confirms that already -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org