C0urante commented on code in PR #14630:
URL: https://github.com/apache/kafka/pull/14630#discussion_r1380342842


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -161,6 +165,7 @@ public class WorkerSinkTaskTest {
     private StatusBackingStore statusBackingStore;
     @Mock
     private KafkaConsumer<byte[], byte[]> consumer;
+    private Consumer<byte[], byte[]> mockConsumer;

Review Comment:
   Nit: it's a little strange to call this `mockConsumer` when there's also a 
`consumer` field that itself contains a mocked consumer. To avoid confusion, 
can we just make this a local variable for any of the tests that require it?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void 
testErrorReporterConfigurationExceptionPropagation() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPartitionCountInCaseOfPartitionRevocation() {
+        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        // Setting up Worker Sink Task to check metrics
+        workerTask = new WorkerSinkTask(
+                taskId, sinkTask, statusListener, TargetState.PAUSED, 
workerConfig, ClusterConfigState.EMPTY, metrics,
+                keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
+                transformationChain, mockConsumer, pluginLoader, time,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, 
statusBackingStore, Collections::emptyList);
+        // Subscribing to Topic = "test" with "MockHandleRebalance"
+        mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalancePass());
+        // Initial Rebalance to assign INITIAL_ASSIGNMENT which is 
"TOPIC_PARTITION" and "TOPIC_PARTITION2"
+        ((MockConsumer<byte[], byte[]>) 
mockConsumer).rebalance(INITIAL_ASSIGNMENT);
+        assertSinkMetricValue("partition-count", 2);
+        // Revoked "TOPIC_PARTITION" and second rebalnce with 
"TOPIC_PARTITION2"
+        ((MockConsumer<byte[], byte[]>) 
mockConsumer).rebalance(Collections.singleton(TOPIC_PARTITION2));
+        assertSinkMetricValue("partition-count", 1);
+    }
+
+    @Test
+    public void testPartitionCountInCaseOfPartitionRevocationFail() {
+        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        // Setting up Worker Sink Task to check metrics
+        workerTask = new WorkerSinkTask(
+                taskId, sinkTask, statusListener, TargetState.PAUSED, 
workerConfig, ClusterConfigState.EMPTY, metrics,
+                keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
+                transformationChain, mockConsumer, pluginLoader, time,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, 
statusBackingStore, Collections::emptyList);
+        // Subscribing to Topic = "test" with "MockHandleRebalance"
+        mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalanceFail());
+        // Initial Rebalance to assign INITIAL_ASSIGNMENT which is 
"TOPIC_PARTITION" and "TOPIC_PARTITION2"
+        ((MockConsumer<byte[], byte[]>) 
mockConsumer).rebalance(INITIAL_ASSIGNMENT);
+        assertSinkMetricValue("partition-count", 2);
+        // Revoked "TOPIC_PARTITION" and second rebalnce with 
"TOPIC_PARTITION2"
+        ((MockConsumer<byte[], byte[]>) 
mockConsumer).rebalance(Collections.singleton(TOPIC_PARTITION2));
+        Assertions.assertThrows(AssertionError.class,
+                () -> assertSinkMetricValue("partition-count", 1));
+    }
+
+    /*
+    Correct Order to Call the updatePartitionCount
+     */
+    private class MockHandleRebalancePass implements ConsumerRebalanceListener 
{
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+            
workerTask.sinkTaskMetricsGroup().recordPartitionCount(mockConsumer.assignment().size());
+            if (partitions.isEmpty()) {
+                return;
+            }
+            // Not doing anything as the objective is to test only Partition 
Count
+        }
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            // Not doing anything as we do not updatePartitionCount here.
+        }
+    }
+    /*
+    Existing flow which calls updatePartitionCount before updation of 
Subscription.
+     */
+    private class MockHandleRebalanceFail implements ConsumerRebalanceListener 
{
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+            if (partitions.isEmpty()) {
+                return;
+            }
+            // Not doing anything as the objective is to test only Partition 
Count
+            
workerTask.sinkTaskMetricsGroup().recordPartitionCount(mockConsumer.assignment().size());
+        }
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            // Not doing anything as we do not updatePartitionCount here.
+            
workerTask.sinkTaskMetricsGroup().recordPartitionCount(mockConsumer.assignment().size());
+        }
+    }

Review Comment:
   I don't think we need any custom rebalance listeners for this PR, we should 
just be testing the one used by the `WorkerSinkTask` class.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void 
testErrorReporterConfigurationExceptionPropagation() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPartitionCountInCaseOfPartitionRevocation() {
+        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        // Setting up Worker Sink Task to check metrics
+        workerTask = new WorkerSinkTask(
+                taskId, sinkTask, statusListener, TargetState.PAUSED, 
workerConfig, ClusterConfigState.EMPTY, metrics,
+                keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
+                transformationChain, mockConsumer, pluginLoader, time,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, 
statusBackingStore, Collections::emptyList);
+        // Subscribing to Topic = "test" with "MockHandleRebalance"
+        mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalancePass());
+        // Initial Rebalance to assign INITIAL_ASSIGNMENT which is 
"TOPIC_PARTITION" and "TOPIC_PARTITION2"
+        ((MockConsumer<byte[], byte[]>) 
mockConsumer).rebalance(INITIAL_ASSIGNMENT);
+        assertSinkMetricValue("partition-count", 2);
+        // Revoked "TOPIC_PARTITION" and second rebalnce with 
"TOPIC_PARTITION2"
+        ((MockConsumer<byte[], byte[]>) 
mockConsumer).rebalance(Collections.singleton(TOPIC_PARTITION2));
+        assertSinkMetricValue("partition-count", 1);
+    }
+
+    @Test
+    public void testPartitionCountInCaseOfPartitionRevocationFail() {

Review Comment:
   This is testing the `MockConsumer` behavior more than anything else. If we 
want to do that (which I don't think is necessary but wouldn't oppose), then we 
should move this kind of testing into the `MockConsumerTest` suite.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void 
testErrorReporterConfigurationExceptionPropagation() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPartitionCountInCaseOfPartitionRevocation() {
+        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);

Review Comment:
   If we make this a local variable, we can change the left-hand type to 
`MockConsumer<byte[], byte[]>` and then the casts later on at lines 2019 and 
2022 won't be necessary.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void 
testErrorReporterConfigurationExceptionPropagation() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPartitionCountInCaseOfPartitionRevocation() {
+        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        // Setting up Worker Sink Task to check metrics
+        workerTask = new WorkerSinkTask(
+                taskId, sinkTask, statusListener, TargetState.PAUSED, 
workerConfig, ClusterConfigState.EMPTY, metrics,
+                keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
+                transformationChain, mockConsumer, pluginLoader, time,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, 
statusBackingStore, Collections::emptyList);
+        // Subscribing to Topic = "test" with "MockHandleRebalance"
+        mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalancePass());

Review Comment:
   We shouldn't be subscribing directly here. Instead, the `WorkerSinkTask` 
instance should be issuing this call as part of its [initializeAndStart 
method](https://github.com/apache/kafka/blob/0390d5b1a24f6f4d0f431928dd009aeb76b3895d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L311).
 This will also allow us to test the real consumer rebalance listener used by 
the `WorkerSinkTask` class, instead of a mocked listener.
   
   I think the sequence of calls should be something like this:
   
   - `MockConsumer::updateBeginningOffsets` (necessary because the 
`WorkerSinkTask` instance's rebalance listener will try to get the current 
position of its assignment in `onPartitionsAssigned`)
   - `WorkerSinkTask::initialize` (you can use the `TASK_CONFIG` constant in 
the test class as the argument)
   - `WorkerSinkTask::initializeAndStart`
   - `assertSinkMetricValue`, `MockConsumer::rebalance` (as many times as you 
want)
   



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void 
testErrorReporterConfigurationExceptionPropagation() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPartitionCountInCaseOfPartitionRevocation() {
+        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        // Setting up Worker Sink Task to check metrics
+        workerTask = new WorkerSinkTask(
+                taskId, sinkTask, statusListener, TargetState.PAUSED, 
workerConfig, ClusterConfigState.EMPTY, metrics,
+                keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
+                transformationChain, mockConsumer, pluginLoader, time,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, 
statusBackingStore, Collections::emptyList);
+        // Subscribing to Topic = "test" with "MockHandleRebalance"
+        mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalancePass());
+        // Initial Rebalance to assign INITIAL_ASSIGNMENT which is 
"TOPIC_PARTITION" and "TOPIC_PARTITION2"
+        ((MockConsumer<byte[], byte[]>) 
mockConsumer).rebalance(INITIAL_ASSIGNMENT);
+        assertSinkMetricValue("partition-count", 2);
+        // Revoked "TOPIC_PARTITION" and second rebalnce with 
"TOPIC_PARTITION2"
+        ((MockConsumer<byte[], byte[]>) 
mockConsumer).rebalance(Collections.singleton(TOPIC_PARTITION2));
+        assertSinkMetricValue("partition-count", 1);

Review Comment:
   Can we also add a step where we verify that the partition count is set to 0 
after `WorkerSinkTask::close` is invoked?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to