gharris1727 commented on code in PR #15313: URL: https://github.com/apache/kafka/pull/15313#discussion_r1481920004
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ########## @@ -343,6 +354,103 @@ public void testShutdown() throws Exception { verify(headerConverter).close(); } + @Test + public void testPollRedelivery() { + createTask(initialState); + expectTaskGetTopic(); + + when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT); + INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime + .thenAnswer(expectConsumerPoll(1)) + // Retry delivery should succeed + .thenAnswer(expectConsumerPoll(0)) + .thenAnswer(expectConsumerPoll(1)) Review Comment: The original test didn't have this additional record, and the current test passes without it. The test also has 4 iteration() calls, which should be: 1. initial assignment 2. first record 3. after pause, redelivery 4. after request commit I think there should only be 3 thenAnswer calls here, and the expectConsumerPoll(1) is the one that should be removed. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ########## @@ -482,21 +590,187 @@ public void testPartialRevocationAndAssignment() { // Second iteration--second call to poll, partial consumer revocation workerTask.iteration(); - verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION)); + verify(sinkTask).close(singleton(TOPIC_PARTITION)); verify(sinkTask, times(2)).put(Collections.emptyList()); // Third iteration--third call to poll, partial consumer assignment workerTask.iteration(); - verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3)); + verify(sinkTask).open(singleton(TOPIC_PARTITION3)); verify(sinkTask, times(3)).put(Collections.emptyList()); // Fourth iteration--fourth call to poll, one partition lost; can't commit offsets for it, one new partition assigned workerTask.iteration(); - verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION3)); - verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION)); + verify(sinkTask).close(singleton(TOPIC_PARTITION3)); + verify(sinkTask).open(singleton(TOPIC_PARTITION)); verify(sinkTask, times(4)).put(Collections.emptyList()); } + @SuppressWarnings("unchecked") + @Test + public void testTaskCancelPreventsFinalOffsetCommit() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectTaskGetTopic(); + expectPollInitialAssignment() + // Put one message through the task to get some offsets to commit + .thenAnswer(expectConsumerPoll(1)) + // the second put will return after the task is stopped and cancelled (asynchronously) + .thenAnswer(expectConsumerPoll(1)); + + expectConversionAndTransformation(null, new RecordHeaders()); + + doAnswer(invocation -> null) + .doAnswer(invocation -> null) + .doAnswer(invocation -> { + workerTask.stop(); + workerTask.cancel(); + return null; + }) + .when(sinkTask).put(anyList()); + + // task performs normal steps in advance of committing offsets + final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2)); + offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + when(sinkTask.preCommit(offsets)).thenReturn(offsets); + + workerTask.execute(); + + // stop wakes up the consumer + verify(consumer).wakeup(); + + verify(sinkTask).close(any(Collection.class)); Review Comment: You can remove this unchecked supression ```suggestion verify(sinkTask).close(any()); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ########## @@ -558,6 +832,143 @@ public void testMetricsGroup() { assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d); } + @Test + public void testHeaders() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + Headers headers = new RecordHeaders(); + headers.add("header_key", "header_value".getBytes()); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1, headers)); + + expectConversionAndTransformation(null, headers); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 1 record + + @SuppressWarnings("unchecked") + ArgumentCaptor<Collection<SinkRecord>> recordCapture = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(recordCapture.capture()); + + assertEquals(1, recordCapture.getValue().size()); + SinkRecord record = recordCapture.getValue().iterator().next(); + + assertEquals("header_value", record.headers().lastWithName("header_key").value()); + } + + @Test + public void testHeadersWithCustomConverter() { + StringConverter stringConverter = new StringConverter(); + SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders(); + + createTask(initialState, stringConverter, testConverter, stringConverter); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + String keyA = "a"; + String valueA = "Árvíztűrő tükörfúrógép"; + Headers headersA = new RecordHeaders(); + String encodingA = "latin2"; + headersA.add("encoding", encodingA.getBytes()); + + String keyB = "b"; + String valueB = "Тестовое сообщение"; + Headers headersB = new RecordHeaders(); + String encodingB = "koi8_r"; + headersB.add("encoding", encodingB.getBytes()); + + expectPollInitialAssignment() + .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> { + List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, + 0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA, Optional.empty()), + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, + 0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB, Optional.empty()) + ); + return new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records)); + }); + + expectTransformation(null); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 1 record Review Comment: the original comment is wrong ```suggestion workerTask.iteration(); // iter 2 -- deliver records ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ########## @@ -482,21 +590,187 @@ public void testPartialRevocationAndAssignment() { // Second iteration--second call to poll, partial consumer revocation workerTask.iteration(); - verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION)); + verify(sinkTask).close(singleton(TOPIC_PARTITION)); verify(sinkTask, times(2)).put(Collections.emptyList()); // Third iteration--third call to poll, partial consumer assignment workerTask.iteration(); - verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3)); + verify(sinkTask).open(singleton(TOPIC_PARTITION3)); verify(sinkTask, times(3)).put(Collections.emptyList()); // Fourth iteration--fourth call to poll, one partition lost; can't commit offsets for it, one new partition assigned workerTask.iteration(); - verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION3)); - verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION)); + verify(sinkTask).close(singleton(TOPIC_PARTITION3)); + verify(sinkTask).open(singleton(TOPIC_PARTITION)); verify(sinkTask, times(4)).put(Collections.emptyList()); } + @SuppressWarnings("unchecked") + @Test + public void testTaskCancelPreventsFinalOffsetCommit() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectTaskGetTopic(); + expectPollInitialAssignment() + // Put one message through the task to get some offsets to commit + .thenAnswer(expectConsumerPoll(1)) + // the second put will return after the task is stopped and cancelled (asynchronously) + .thenAnswer(expectConsumerPoll(1)); + + expectConversionAndTransformation(null, new RecordHeaders()); + + doAnswer(invocation -> null) + .doAnswer(invocation -> null) + .doAnswer(invocation -> { + workerTask.stop(); + workerTask.cancel(); + return null; + }) + .when(sinkTask).put(anyList()); + + // task performs normal steps in advance of committing offsets + final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2)); + offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + when(sinkTask.preCommit(offsets)).thenReturn(offsets); + + workerTask.execute(); + + // stop wakes up the consumer + verify(consumer).wakeup(); + + verify(sinkTask).close(any(Collection.class)); + } + + @Test + public void testDeliveryWithMutatingTransform() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectTaskGetTopic(); + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1)) + .thenAnswer(expectConsumerPoll(0)); + + expectConversionAndTransformation("newtopic_", new RecordHeaders()); + + workerTask.iteration(); // initial assignment + + workerTask.iteration(); // first record delivered + + final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); + offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + when(sinkTask.preCommit(offsets)).thenReturn(offsets); + + sinkTaskContext.getValue().requestCommit(); + assertTrue(sinkTaskContext.getValue().isCommitRequested()); + + assertNotEquals(offsets, workerTask.lastCommittedOffsets()); + workerTask.iteration(); // triggers the commit + + ArgumentCaptor<OffsetCommitCallback> callback = ArgumentCaptor.forClass(OffsetCommitCallback.class); + verify(consumer).commitAsync(eq(offsets), callback.capture()); + + callback.getValue().onComplete(offsets, null); + + assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared + assertEquals(offsets, workerTask.lastCommittedOffsets()); + assertEquals(0, workerTask.commitFailures()); + assertEquals(1.0, metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001); + } + + @Test + public void testMissingTimestampPropagation() { + createTask(initialState); + expectTaskGetTopic(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME, new RecordHeaders())); + + expectConversionAndTransformation(null, new RecordHeaders()); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 1 record + + @SuppressWarnings("unchecked") + ArgumentCaptor<Collection<SinkRecord>> records = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(records.capture()); + + SinkRecord record = records.getValue().iterator().next(); + + // we expect null for missing timestamp, the sentinel value of Record.NO_TIMESTAMP is Kafka's API + assertNull(record.timestamp()); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + } + + @Test + public void testTimestampPropagation() { + final Long timestamp = System.currentTimeMillis(); + final TimestampType timestampType = TimestampType.CREATE_TIME; + + createTask(initialState); + expectTaskGetTopic(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1, timestamp, timestampType, new RecordHeaders())); + + expectConversionAndTransformation(null, new RecordHeaders()); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 1 record + + @SuppressWarnings("unchecked") + ArgumentCaptor<Collection<SinkRecord>> records = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(records.capture()); + + SinkRecord record = records.getValue().iterator().next(); + + assertEquals(timestamp, record.timestamp()); + assertEquals(timestampType, record.timestampType()); + } + + @Test + public void testTopicsRegex() { + Map<String, String> props = new HashMap<>(TASK_PROPS); + props.remove("topics"); + props.put("topics.regex", "te.*"); + TaskConfig taskConfig = new TaskConfig(props); + + createTask(TargetState.PAUSED); + + workerTask.initialize(taskConfig); + workerTask.initializeAndStart(); + + ArgumentCaptor<Pattern> topicsRegex = ArgumentCaptor.forClass(Pattern.class); + + verify(consumer).subscribe(topicsRegex.capture(), rebalanceListener.capture()); Review Comment: Could you add an assertion on the topicsRegex? ```suggestion verify(consumer).subscribe(topicsRegex.capture(), rebalanceListener.capture()); assertEquals("te.*", topicsRegex.getValue().pattern()); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ########## @@ -162,7 +153,6 @@ public class WorkerSinkTaskTest { @Mock private ErrorHandlingMetrics errorHandlingMetrics; Review Comment: Above here, taskId1 is not used ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ########## @@ -343,6 +354,103 @@ public void testShutdown() throws Exception { verify(headerConverter).close(); } + @Test + public void testPollRedelivery() { + createTask(initialState); + expectTaskGetTopic(); + + when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT); + INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); Review Comment: This is already in expectPollInitialAssignment, right? ```suggestion ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ########## @@ -558,6 +832,143 @@ public void testMetricsGroup() { assertEquals(30, metrics.currentMetricValueAsDouble(group1.metricGroup(), "put-batch-max-time-ms"), 0.001d); } + @Test + public void testHeaders() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + Headers headers = new RecordHeaders(); + headers.add("header_key", "header_value".getBytes()); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1, headers)); + + expectConversionAndTransformation(null, headers); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 1 record + + @SuppressWarnings("unchecked") + ArgumentCaptor<Collection<SinkRecord>> recordCapture = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(recordCapture.capture()); + + assertEquals(1, recordCapture.getValue().size()); + SinkRecord record = recordCapture.getValue().iterator().next(); + + assertEquals("header_value", record.headers().lastWithName("header_key").value()); + } + + @Test + public void testHeadersWithCustomConverter() { + StringConverter stringConverter = new StringConverter(); + SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders(); + + createTask(initialState, stringConverter, testConverter, stringConverter); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + String keyA = "a"; + String valueA = "Árvíztűrő tükörfúrógép"; + Headers headersA = new RecordHeaders(); + String encodingA = "latin2"; + headersA.add("encoding", encodingA.getBytes()); + + String keyB = "b"; + String valueB = "Тестовое сообщение"; + Headers headersB = new RecordHeaders(); + String encodingB = "koi8_r"; + headersB.add("encoding", encodingB.getBytes()); + + expectPollInitialAssignment() + .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> { + List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, + 0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA, Optional.empty()), + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, + 0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB, Optional.empty()) + ); + return new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records)); + }); + + expectTransformation(null); + + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 1 record + + @SuppressWarnings("unchecked") + ArgumentCaptor<Collection<SinkRecord>> records = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(records.capture()); + + Iterator<SinkRecord> iterator = records.getValue().iterator(); + + SinkRecord recordA = iterator.next(); + assertEquals(keyA, recordA.key()); + assertEquals(valueA, recordA.value()); + + SinkRecord recordB = iterator.next(); + assertEquals(keyB, recordB.key()); + assertEquals(valueB, recordB.value()); + } + + @Test + public void testOriginalTopicWithTopicMutatingTransformations() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1)); + + expectConversionAndTransformation("newtopic_", new RecordHeaders()); + + workerTask.iteration(); // initial assignment + workerTask.iteration(); // first record delivered + + @SuppressWarnings("unchecked") + ArgumentCaptor<Collection<SinkRecord>> recordCapture = ArgumentCaptor.forClass(Collection.class); + verify(sinkTask, times(2)).put(recordCapture.capture()); + + assertEquals(1, recordCapture.getValue().size()); + SinkRecord record = recordCapture.getValue().iterator().next(); + assertEquals(TOPIC, record.originalTopic()); + assertEquals("newtopic_" + TOPIC, record.topic()); + } + + @Test + public void testPartitionCountInCaseOfPartitionRevocation() { + MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + // Setting up Worker Sink Task to check metrics + workerTask = new WorkerSinkTask( + taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics, + keyConverter, valueConverter, errorHandlingMetrics, headerConverter, + transformationChain, mockConsumer, pluginLoader, time, + RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore, Collections::emptyList); + mockConsumer.updateBeginningOffsets( + new HashMap<TopicPartition, Long>() {{ + put(TOPIC_PARTITION, 0 * 1L); + put(TOPIC_PARTITION2, 0 * 1L); Review Comment: This is a warning in my IDE and the multiply doesn't seem to serve any purpose. ```suggestion put(TOPIC_PARTITION, 0L); put(TOPIC_PARTITION2, 0L); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org