Copilot commented on code in PR #20511: URL: https://github.com/apache/kafka/pull/20511#discussion_r2333356980
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java: ########## @@ -170,39 +167,56 @@ void testOnTasksAssigned() { ); inOrder.verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED); inOrder.verify(taskManager).handleRebalanceComplete(); + inOrder.verify(streamsRebalanceData).setReconciledAssignment(assignment); } @Test void testOnTasksAssignedWithException() { final Exception exception = new RuntimeException("sample exception"); doThrow(exception).when(taskManager).handleAssignment(any(), any()); - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); - final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksAssigned(new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())); - assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty()); + final StreamsRebalanceData streamsRebalanceData = mock(StreamsRebalanceData.class); + when(streamsRebalanceData.subtopologies()).thenReturn(Map.of()); + createRebalanceListenerWithRebalanceData(streamsRebalanceData); + + final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) + ); + assertTrue(result.isPresent()); assertEquals(exception, result.get()); - verify(taskManager).handleLostAll(); + verify(taskManager).handleAssignment(any(), any()); verify(streamThread, never()).setState(StreamThread.State.PARTITIONS_ASSIGNED); verify(taskManager, never()).handleRebalanceComplete(); + verify(streamsRebalanceData, never()).setReconciledAssignment(any()); } @Test void testOnAllTasksLost() { - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + final StreamsRebalanceData streamsRebalanceData = mock(StreamsRebalanceData.class); + when(streamsRebalanceData.subtopologies()).thenReturn(Map.of()); + createRebalanceListenerWithRebalanceData(streamsRebalanceData); + assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty()); - verify(taskManager).handleLostAll(); + Review Comment: Lines 199 and 201 contain unnecessary trailing whitespace that should be removed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java: ########## @@ -170,39 +167,56 @@ void testOnTasksAssigned() { ); inOrder.verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED); inOrder.verify(taskManager).handleRebalanceComplete(); + inOrder.verify(streamsRebalanceData).setReconciledAssignment(assignment); } @Test void testOnTasksAssignedWithException() { final Exception exception = new RuntimeException("sample exception"); doThrow(exception).when(taskManager).handleAssignment(any(), any()); - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); - final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksAssigned(new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())); - assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty()); + final StreamsRebalanceData streamsRebalanceData = mock(StreamsRebalanceData.class); + when(streamsRebalanceData.subtopologies()).thenReturn(Map.of()); + createRebalanceListenerWithRebalanceData(streamsRebalanceData); + + final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) + ); + Review Comment: There are multiple trailing whitespace issues in this test file. Lines 181 and 185 have unnecessary trailing spaces that should be removed for consistency. ```suggestion final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksAssigned( new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) ); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1532,21 +1512,32 @@ private void runRebalanceCallbacksOnClose() { int memberEpoch = groupMetadata.get().get().generationId(); - Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get(); + final Exception error; + + if (streamsRebalanceListenerInvoker.isPresent()) { - if (assignedPartitions.isEmpty()) - // Nothing to revoke. - return; + if (memberEpoch > 0) + error = streamsRebalanceListenerInvoker.get().invokeAllTasksRevoked(); + else + error = streamsRebalanceListenerInvoker.get().invokeAllTasksLost(); - SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); - droppedPartitions.addAll(assignedPartitions); + } else { - final Exception error; + Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get(); - if (memberEpoch > 0) - error = rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions); - else - error = rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions); + if (assignedPartitions.isEmpty()) + // Nothing to revoke. + return; + + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(assignedPartitions); + + if (memberEpoch > 0) + error = rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions); + else + error = rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions); Review Comment: The if-else statements on lines 1535-1538 should use braces for consistency with the project's coding style, even for single-line statements. ```suggestion if (memberEpoch > 0) { error = rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions); } else { error = rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions); } ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1532,21 +1512,32 @@ private void runRebalanceCallbacksOnClose() { int memberEpoch = groupMetadata.get().get().generationId(); - Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get(); + final Exception error; + + if (streamsRebalanceListenerInvoker.isPresent()) { - if (assignedPartitions.isEmpty()) - // Nothing to revoke. - return; + if (memberEpoch > 0) + error = streamsRebalanceListenerInvoker.get().invokeAllTasksRevoked(); + else + error = streamsRebalanceListenerInvoker.get().invokeAllTasksLost(); Review Comment: The if-else statements on lines 1519-1522 should use braces for consistency with the project's coding style, even for single-line statements. ```suggestion if (memberEpoch > 0) { error = streamsRebalanceListenerInvoker.get().invokeAllTasksRevoked(); } else { error = streamsRebalanceListenerInvoker.get().invokeAllTasksLost(); } ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java: ########## @@ -170,39 +167,56 @@ void testOnTasksAssigned() { ); inOrder.verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED); inOrder.verify(taskManager).handleRebalanceComplete(); + inOrder.verify(streamsRebalanceData).setReconciledAssignment(assignment); } @Test void testOnTasksAssignedWithException() { final Exception exception = new RuntimeException("sample exception"); doThrow(exception).when(taskManager).handleAssignment(any(), any()); - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); - final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksAssigned(new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())); - assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty()); + final StreamsRebalanceData streamsRebalanceData = mock(StreamsRebalanceData.class); + when(streamsRebalanceData.subtopologies()).thenReturn(Map.of()); + createRebalanceListenerWithRebalanceData(streamsRebalanceData); + + final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) + ); + assertTrue(result.isPresent()); assertEquals(exception, result.get()); - verify(taskManager).handleLostAll(); + verify(taskManager).handleAssignment(any(), any()); verify(streamThread, never()).setState(StreamThread.State.PARTITIONS_ASSIGNED); verify(taskManager, never()).handleRebalanceComplete(); + verify(streamsRebalanceData, never()).setReconciledAssignment(any()); } @Test void testOnAllTasksLost() { - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + final StreamsRebalanceData streamsRebalanceData = mock(StreamsRebalanceData.class); + when(streamsRebalanceData.subtopologies()).thenReturn(Map.of()); + createRebalanceListenerWithRebalanceData(streamsRebalanceData); + assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty()); - verify(taskManager).handleLostAll(); + + final InOrder inOrder = inOrder(taskManager, streamsRebalanceData); + inOrder.verify(taskManager).handleLostAll(); + inOrder.verify(streamsRebalanceData).setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY); } @Test void testOnAllTasksLostWithException() { final Exception exception = new RuntimeException("sample exception"); doThrow(exception).when(taskManager).handleLostAll(); - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + final StreamsRebalanceData streamsRebalanceData = mock(StreamsRebalanceData.class); + when(streamsRebalanceData.subtopologies()).thenReturn(Map.of()); + createRebalanceListenerWithRebalanceData(streamsRebalanceData); + Review Comment: Line 215 contains unnecessary trailing whitespace that should be removed. ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org