Copilot commented on code in PR #20511:
URL: https://github.com/apache/kafka/pull/20511#discussion_r2333356980


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java:
##########
@@ -170,39 +167,56 @@ void testOnTasksAssigned() {
         );
         
inOrder.verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED);
         inOrder.verify(taskManager).handleRebalanceComplete();
+        
inOrder.verify(streamsRebalanceData).setReconciledAssignment(assignment);
     }
 
     @Test
     void testOnTasksAssignedWithException() {
         final Exception exception = new RuntimeException("sample exception");
         doThrow(exception).when(taskManager).handleAssignment(any(), any());
 
-        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
-        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksAssigned(new 
StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()));
-        assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty());
+        final StreamsRebalanceData streamsRebalanceData = 
mock(StreamsRebalanceData.class);
+        when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
+        createRebalanceListenerWithRebalanceData(streamsRebalanceData);
+        
+        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksAssigned(
+            new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
+        );
+        
         assertTrue(result.isPresent());
         assertEquals(exception, result.get());
-        verify(taskManager).handleLostAll();
+        verify(taskManager).handleAssignment(any(), any());
         verify(streamThread, 
never()).setState(StreamThread.State.PARTITIONS_ASSIGNED);
         verify(taskManager, never()).handleRebalanceComplete();
+        verify(streamsRebalanceData, never()).setReconciledAssignment(any());
     }
 
     @Test
     void testOnAllTasksLost() {
-        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
+        final StreamsRebalanceData streamsRebalanceData = 
mock(StreamsRebalanceData.class);
+        when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
+        createRebalanceListenerWithRebalanceData(streamsRebalanceData);
+        
         assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty());
-        verify(taskManager).handleLostAll();
+        

Review Comment:
   Lines 199 and 201 contain unnecessary trailing whitespace that should be 
removed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java:
##########
@@ -170,39 +167,56 @@ void testOnTasksAssigned() {
         );
         
inOrder.verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED);
         inOrder.verify(taskManager).handleRebalanceComplete();
+        
inOrder.verify(streamsRebalanceData).setReconciledAssignment(assignment);
     }
 
     @Test
     void testOnTasksAssignedWithException() {
         final Exception exception = new RuntimeException("sample exception");
         doThrow(exception).when(taskManager).handleAssignment(any(), any());
 
-        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
-        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksAssigned(new 
StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()));
-        assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty());
+        final StreamsRebalanceData streamsRebalanceData = 
mock(StreamsRebalanceData.class);
+        when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
+        createRebalanceListenerWithRebalanceData(streamsRebalanceData);
+        
+        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksAssigned(
+            new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
+        );
+        

Review Comment:
   There are multiple trailing whitespace issues in this test file. Lines 181 
and 185 have unnecessary trailing spaces that should be removed for consistency.
   ```suggestion
   
           final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksAssigned(
               new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
           );
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1532,21 +1512,32 @@ private void runRebalanceCallbacksOnClose() {
 
         int memberEpoch = groupMetadata.get().get().generationId();
 
-        Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get();
+        final Exception error;
+
+        if (streamsRebalanceListenerInvoker.isPresent()) {
 
-        if (assignedPartitions.isEmpty())
-            // Nothing to revoke.
-            return;
+            if (memberEpoch > 0)
+                error = 
streamsRebalanceListenerInvoker.get().invokeAllTasksRevoked();
+            else
+                error = 
streamsRebalanceListenerInvoker.get().invokeAllTasksLost();
 
-        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-        droppedPartitions.addAll(assignedPartitions);
+        } else {
 
-        final Exception error;
+            Set<TopicPartition> assignedPartitions = 
groupAssignmentSnapshot.get();
 
-        if (memberEpoch > 0)
-            error = 
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
-        else
-            error = 
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
+            if (assignedPartitions.isEmpty())
+                // Nothing to revoke.
+                return;
+
+            SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+            droppedPartitions.addAll(assignedPartitions);
+
+            if (memberEpoch > 0)
+                error = 
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
+            else
+                error = 
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);

Review Comment:
   The if-else statements on lines 1535-1538 should use braces for consistency 
with the project's coding style, even for single-line statements.
   ```suggestion
               if (memberEpoch > 0) {
                   error = 
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
               } else {
                   error = 
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
               }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1532,21 +1512,32 @@ private void runRebalanceCallbacksOnClose() {
 
         int memberEpoch = groupMetadata.get().get().generationId();
 
-        Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get();
+        final Exception error;
+
+        if (streamsRebalanceListenerInvoker.isPresent()) {
 
-        if (assignedPartitions.isEmpty())
-            // Nothing to revoke.
-            return;
+            if (memberEpoch > 0)
+                error = 
streamsRebalanceListenerInvoker.get().invokeAllTasksRevoked();
+            else
+                error = 
streamsRebalanceListenerInvoker.get().invokeAllTasksLost();

Review Comment:
   The if-else statements on lines 1519-1522 should use braces for consistency 
with the project's coding style, even for single-line statements.
   ```suggestion
               if (memberEpoch > 0) {
                   error = 
streamsRebalanceListenerInvoker.get().invokeAllTasksRevoked();
               } else {
                   error = 
streamsRebalanceListenerInvoker.get().invokeAllTasksLost();
               }
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java:
##########
@@ -170,39 +167,56 @@ void testOnTasksAssigned() {
         );
         
inOrder.verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED);
         inOrder.verify(taskManager).handleRebalanceComplete();
+        
inOrder.verify(streamsRebalanceData).setReconciledAssignment(assignment);
     }
 
     @Test
     void testOnTasksAssignedWithException() {
         final Exception exception = new RuntimeException("sample exception");
         doThrow(exception).when(taskManager).handleAssignment(any(), any());
 
-        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
-        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksAssigned(new 
StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()));
-        assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty());
+        final StreamsRebalanceData streamsRebalanceData = 
mock(StreamsRebalanceData.class);
+        when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
+        createRebalanceListenerWithRebalanceData(streamsRebalanceData);
+        
+        final Optional<Exception> result = 
defaultStreamsRebalanceListener.onTasksAssigned(
+            new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
+        );
+        
         assertTrue(result.isPresent());
         assertEquals(exception, result.get());
-        verify(taskManager).handleLostAll();
+        verify(taskManager).handleAssignment(any(), any());
         verify(streamThread, 
never()).setState(StreamThread.State.PARTITIONS_ASSIGNED);
         verify(taskManager, never()).handleRebalanceComplete();
+        verify(streamsRebalanceData, never()).setReconciledAssignment(any());
     }
 
     @Test
     void testOnAllTasksLost() {
-        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
+        final StreamsRebalanceData streamsRebalanceData = 
mock(StreamsRebalanceData.class);
+        when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
+        createRebalanceListenerWithRebalanceData(streamsRebalanceData);
+        
         assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty());
-        verify(taskManager).handleLostAll();
+        
+        final InOrder inOrder = inOrder(taskManager, streamsRebalanceData);
+        inOrder.verify(taskManager).handleLostAll();
+        
inOrder.verify(streamsRebalanceData).setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
     }
 
     @Test
     void testOnAllTasksLostWithException() {
         final Exception exception = new RuntimeException("sample exception");
         doThrow(exception).when(taskManager).handleLostAll();
 
-        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
+        final StreamsRebalanceData streamsRebalanceData = 
mock(StreamsRebalanceData.class);
+        when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
+        createRebalanceListenerWithRebalanceData(streamsRebalanceData);
+        

Review Comment:
   Line 215 contains unnecessary trailing whitespace that should be removed.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to