C0urante commented on code in PR #14630:
URL: https://github.com/apache/kafka/pull/14630#discussion_r1380342842
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -161,6 +165,7 @@ public class WorkerSinkTaskTest {
private StatusBackingStore statusBackingStore;
@Mock
private KafkaConsumer<byte[], byte[]> consumer;
+ private Consumer<byte[], byte[]> mockConsumer;
Review Comment:
Nit: it's a little strange to call this `mockConsumer` when there's also a
`consumer` field that itself contains a mocked consumer. To avoid confusion,
can we just make this a local variable for any of the tests that require it?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void
testErrorReporterConfigurationExceptionPropagation() {
PowerMock.verifyAll();
}
+ @Test
+ public void testPartitionCountInCaseOfPartitionRevocation() {
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ // Setting up Worker Sink Task to check metrics
+ workerTask = new WorkerSinkTask(
+ taskId, sinkTask, statusListener, TargetState.PAUSED,
workerConfig, ClusterConfigState.EMPTY, metrics,
+ keyConverter, valueConverter, errorHandlingMetrics,
headerConverter,
+ transformationChain, mockConsumer, pluginLoader, time,
+ RetryWithToleranceOperatorTest.NOOP_OPERATOR, null,
statusBackingStore, Collections::emptyList);
+ // Subscribing to Topic = "test" with "MockHandleRebalance"
+ mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalancePass());
+ // Initial Rebalance to assign INITIAL_ASSIGNMENT which is
"TOPIC_PARTITION" and "TOPIC_PARTITION2"
+ ((MockConsumer<byte[], byte[]>)
mockConsumer).rebalance(INITIAL_ASSIGNMENT);
+ assertSinkMetricValue("partition-count", 2);
+ // Revoked "TOPIC_PARTITION" and second rebalnce with
"TOPIC_PARTITION2"
+ ((MockConsumer<byte[], byte[]>)
mockConsumer).rebalance(Collections.singleton(TOPIC_PARTITION2));
+ assertSinkMetricValue("partition-count", 1);
+ }
+
+ @Test
+ public void testPartitionCountInCaseOfPartitionRevocationFail() {
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ // Setting up Worker Sink Task to check metrics
+ workerTask = new WorkerSinkTask(
+ taskId, sinkTask, statusListener, TargetState.PAUSED,
workerConfig, ClusterConfigState.EMPTY, metrics,
+ keyConverter, valueConverter, errorHandlingMetrics,
headerConverter,
+ transformationChain, mockConsumer, pluginLoader, time,
+ RetryWithToleranceOperatorTest.NOOP_OPERATOR, null,
statusBackingStore, Collections::emptyList);
+ // Subscribing to Topic = "test" with "MockHandleRebalance"
+ mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalanceFail());
+ // Initial Rebalance to assign INITIAL_ASSIGNMENT which is
"TOPIC_PARTITION" and "TOPIC_PARTITION2"
+ ((MockConsumer<byte[], byte[]>)
mockConsumer).rebalance(INITIAL_ASSIGNMENT);
+ assertSinkMetricValue("partition-count", 2);
+ // Revoked "TOPIC_PARTITION" and second rebalnce with
"TOPIC_PARTITION2"
+ ((MockConsumer<byte[], byte[]>)
mockConsumer).rebalance(Collections.singleton(TOPIC_PARTITION2));
+ Assertions.assertThrows(AssertionError.class,
+ () -> assertSinkMetricValue("partition-count", 1));
+ }
+
+ /*
+ Correct Order to Call the updatePartitionCount
+ */
+ private class MockHandleRebalancePass implements ConsumerRebalanceListener
{
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+
workerTask.sinkTaskMetricsGroup().recordPartitionCount(mockConsumer.assignment().size());
+ if (partitions.isEmpty()) {
+ return;
+ }
+ // Not doing anything as the objective is to test only Partition
Count
+ }
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ // Not doing anything as we do not updatePartitionCount here.
+ }
+ }
+ /*
+ Existing flow which calls updatePartitionCount before updation of
Subscription.
+ */
+ private class MockHandleRebalanceFail implements ConsumerRebalanceListener
{
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ if (partitions.isEmpty()) {
+ return;
+ }
+ // Not doing anything as the objective is to test only Partition
Count
+
workerTask.sinkTaskMetricsGroup().recordPartitionCount(mockConsumer.assignment().size());
+ }
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ // Not doing anything as we do not updatePartitionCount here.
+
workerTask.sinkTaskMetricsGroup().recordPartitionCount(mockConsumer.assignment().size());
+ }
+ }
Review Comment:
I don't think we need any custom rebalance listeners for this PR, we should
just be testing the one used by the `WorkerSinkTask` class.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void
testErrorReporterConfigurationExceptionPropagation() {
PowerMock.verifyAll();
}
+ @Test
+ public void testPartitionCountInCaseOfPartitionRevocation() {
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ // Setting up Worker Sink Task to check metrics
+ workerTask = new WorkerSinkTask(
+ taskId, sinkTask, statusListener, TargetState.PAUSED,
workerConfig, ClusterConfigState.EMPTY, metrics,
+ keyConverter, valueConverter, errorHandlingMetrics,
headerConverter,
+ transformationChain, mockConsumer, pluginLoader, time,
+ RetryWithToleranceOperatorTest.NOOP_OPERATOR, null,
statusBackingStore, Collections::emptyList);
+ // Subscribing to Topic = "test" with "MockHandleRebalance"
+ mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalancePass());
+ // Initial Rebalance to assign INITIAL_ASSIGNMENT which is
"TOPIC_PARTITION" and "TOPIC_PARTITION2"
+ ((MockConsumer<byte[], byte[]>)
mockConsumer).rebalance(INITIAL_ASSIGNMENT);
+ assertSinkMetricValue("partition-count", 2);
+ // Revoked "TOPIC_PARTITION" and second rebalnce with
"TOPIC_PARTITION2"
+ ((MockConsumer<byte[], byte[]>)
mockConsumer).rebalance(Collections.singleton(TOPIC_PARTITION2));
+ assertSinkMetricValue("partition-count", 1);
+ }
+
+ @Test
+ public void testPartitionCountInCaseOfPartitionRevocationFail() {
Review Comment:
This is testing the `MockConsumer` behavior more than anything else. If we
want to do that (which I don't think is necessary but wouldn't oppose), then we
should move this kind of testing into the `MockConsumerTest` suite.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void
testErrorReporterConfigurationExceptionPropagation() {
PowerMock.verifyAll();
}
+ @Test
+ public void testPartitionCountInCaseOfPartitionRevocation() {
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
Review Comment:
If we make this a local variable, we can change the left-hand type to
`MockConsumer<byte[], byte[]>` and then the casts later on at lines 2019 and
2022 won't be necessary.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void
testErrorReporterConfigurationExceptionPropagation() {
PowerMock.verifyAll();
}
+ @Test
+ public void testPartitionCountInCaseOfPartitionRevocation() {
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ // Setting up Worker Sink Task to check metrics
+ workerTask = new WorkerSinkTask(
+ taskId, sinkTask, statusListener, TargetState.PAUSED,
workerConfig, ClusterConfigState.EMPTY, metrics,
+ keyConverter, valueConverter, errorHandlingMetrics,
headerConverter,
+ transformationChain, mockConsumer, pluginLoader, time,
+ RetryWithToleranceOperatorTest.NOOP_OPERATOR, null,
statusBackingStore, Collections::emptyList);
+ // Subscribing to Topic = "test" with "MockHandleRebalance"
+ mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalancePass());
Review Comment:
We shouldn't be subscribing directly here. Instead, the `WorkerSinkTask`
instance should be issuing this call as part of its [initializeAndStart
method](https://github.com/apache/kafka/blob/0390d5b1a24f6f4d0f431928dd009aeb76b3895d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L311).
This will also allow us to test the real consumer rebalance listener used by
the `WorkerSinkTask` class, instead of a mocked listener.
I think the sequence of calls should be something like this:
- `MockConsumer::updateBeginningOffsets` (necessary because the
`WorkerSinkTask` instance's rebalance listener will try to get the current
position of its assignment in `onPartitionsAssigned`)
- `WorkerSinkTask::initialize` (you can use the `TASK_CONFIG` constant in
the test class as the argument)
- `WorkerSinkTask::initializeAndStart`
- `assertSinkMetricValue`, `MockConsumer::rebalance` (as many times as you
want)
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -1999,6 +2004,81 @@ public void
testErrorReporterConfigurationExceptionPropagation() {
PowerMock.verifyAll();
}
+ @Test
+ public void testPartitionCountInCaseOfPartitionRevocation() {
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ // Setting up Worker Sink Task to check metrics
+ workerTask = new WorkerSinkTask(
+ taskId, sinkTask, statusListener, TargetState.PAUSED,
workerConfig, ClusterConfigState.EMPTY, metrics,
+ keyConverter, valueConverter, errorHandlingMetrics,
headerConverter,
+ transformationChain, mockConsumer, pluginLoader, time,
+ RetryWithToleranceOperatorTest.NOOP_OPERATOR, null,
statusBackingStore, Collections::emptyList);
+ // Subscribing to Topic = "test" with "MockHandleRebalance"
+ mockConsumer.subscribe(asList(TOPIC), new MockHandleRebalancePass());
+ // Initial Rebalance to assign INITIAL_ASSIGNMENT which is
"TOPIC_PARTITION" and "TOPIC_PARTITION2"
+ ((MockConsumer<byte[], byte[]>)
mockConsumer).rebalance(INITIAL_ASSIGNMENT);
+ assertSinkMetricValue("partition-count", 2);
+ // Revoked "TOPIC_PARTITION" and second rebalnce with
"TOPIC_PARTITION2"
+ ((MockConsumer<byte[], byte[]>)
mockConsumer).rebalance(Collections.singleton(TOPIC_PARTITION2));
+ assertSinkMetricValue("partition-count", 1);
Review Comment:
Can we also add a step where we verify that the partition count is set to 0
after `WorkerSinkTask::close` is invoked?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]