gharris1727 commented on code in PR #15313:
URL: https://github.com/apache/kafka/pull/15313#discussion_r1481920004


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##########
@@ -343,6 +354,103 @@ public void testShutdown() throws Exception {
         verify(headerConverter).close();
     }
 
+    @Test
+    public void testPollRedelivery() {
+        createTask(initialState);
+        expectTaskGetTopic();
+
+        when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT);
+        INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectPollInitialAssignment()
+                // If a retriable exception is thrown, we should redeliver the 
same batch, pausing the consumer in the meantime
+                .thenAnswer(expectConsumerPoll(1))
+                // Retry delivery should succeed
+                .thenAnswer(expectConsumerPoll(0))
+                .thenAnswer(expectConsumerPoll(1))

Review Comment:
   The original test didn't have this additional record, and the current test 
passes without it.
   The test also has 4 iteration() calls, which should be:
   1. initial assignment
   2. first record
   3. after pause, redelivery
   4. after request commit
   
   I think there should only be 3 thenAnswer calls here, and the 
expectConsumerPoll(1) is the one that should be removed.
   



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##########
@@ -482,21 +590,187 @@ public void testPartialRevocationAndAssignment() {
 
         // Second iteration--second call to poll, partial consumer revocation
         workerTask.iteration();
-        verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION));
+        verify(sinkTask).close(singleton(TOPIC_PARTITION));
         verify(sinkTask, times(2)).put(Collections.emptyList());
 
         // Third iteration--third call to poll, partial consumer assignment
         workerTask.iteration();
-        verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3));
+        verify(sinkTask).open(singleton(TOPIC_PARTITION3));
         verify(sinkTask, times(3)).put(Collections.emptyList());
 
         // Fourth iteration--fourth call to poll, one partition lost; can't 
commit offsets for it, one new partition assigned
         workerTask.iteration();
-        verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION3));
-        verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION));
+        verify(sinkTask).close(singleton(TOPIC_PARTITION3));
+        verify(sinkTask).open(singleton(TOPIC_PARTITION));
         verify(sinkTask, times(4)).put(Collections.emptyList());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testTaskCancelPreventsFinalOffsetCommit() {
+        createTask(initialState);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectTaskGetTopic();
+        expectPollInitialAssignment()
+                // Put one message through the task to get some offsets to 
commit
+                .thenAnswer(expectConsumerPoll(1))
+                // the second put will return after the task is stopped and 
cancelled (asynchronously)
+                .thenAnswer(expectConsumerPoll(1));
+
+        expectConversionAndTransformation(null, new RecordHeaders());
+
+        doAnswer(invocation -> null)
+                .doAnswer(invocation -> null)
+                .doAnswer(invocation -> {
+                    workerTask.stop();
+                    workerTask.cancel();
+                    return null;
+                })
+                .when(sinkTask).put(anyList());
+
+        // task performs normal steps in advance of committing offsets
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
+        offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+        when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+
+        workerTask.execute();
+
+        // stop wakes up the consumer
+        verify(consumer).wakeup();
+
+        verify(sinkTask).close(any(Collection.class));

Review Comment:
   You can remove this unchecked supression
   ```suggestion
           verify(sinkTask).close(any());
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##########
@@ -558,6 +832,143 @@ public void testMetricsGroup() {
         assertEquals(30, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"put-batch-max-time-ms"), 0.001d);
     }
 
+    @Test
+    public void testHeaders() {
+        createTask(initialState);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        Headers headers = new RecordHeaders();
+        headers.add("header_key", "header_value".getBytes());
+
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1, headers));
+
+        expectConversionAndTransformation(null, headers);
+
+        workerTask.iteration(); // iter 1 -- initial assignment
+        workerTask.iteration(); // iter 2 -- deliver 1 record
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Collection<SinkRecord>> recordCapture = 
ArgumentCaptor.forClass(Collection.class);
+        verify(sinkTask, times(2)).put(recordCapture.capture());
+
+        assertEquals(1, recordCapture.getValue().size());
+        SinkRecord record = recordCapture.getValue().iterator().next();
+
+        assertEquals("header_value", 
record.headers().lastWithName("header_key").value());
+    }
+
+    @Test
+    public void testHeadersWithCustomConverter() {
+        StringConverter stringConverter = new StringConverter();
+        SampleConverterWithHeaders testConverter = new 
SampleConverterWithHeaders();
+
+        createTask(initialState, stringConverter, testConverter, 
stringConverter);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        String keyA = "a";
+        String valueA = "Árvíztűrő tükörfúrógép";
+        Headers headersA = new RecordHeaders();
+        String encodingA = "latin2";
+        headersA.add("encoding", encodingA.getBytes());
+
+        String keyB = "b";
+        String valueB = "Тестовое сообщение";
+        Headers headersB = new RecordHeaders();
+        String encodingB = "koi8_r";
+        headersB.add("encoding", encodingB.getBytes());
+
+        expectPollInitialAssignment()
+                .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) 
invocation -> {
+                    List<ConsumerRecord<byte[], byte[]>> records = 
Arrays.asList(
+                            new ConsumerRecord<>(TOPIC, PARTITION, 
FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, 
TimestampType.NO_TIMESTAMP_TYPE,
+                                    0, 0, keyA.getBytes(), 
valueA.getBytes(encodingA), headersA, Optional.empty()),
+                            new ConsumerRecord<>(TOPIC, PARTITION, 
FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, 
TimestampType.NO_TIMESTAMP_TYPE,
+                                    0, 0, keyB.getBytes(), 
valueB.getBytes(encodingB), headersB, Optional.empty())
+                    );
+                    return new ConsumerRecords<>(Collections.singletonMap(new 
TopicPartition(TOPIC, PARTITION), records));
+                });
+
+        expectTransformation(null);
+
+        workerTask.iteration(); // iter 1 -- initial assignment
+        workerTask.iteration(); // iter 2 -- deliver 1 record

Review Comment:
   the original comment is wrong
   ```suggestion
           workerTask.iteration(); // iter 2 -- deliver records
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##########
@@ -482,21 +590,187 @@ public void testPartialRevocationAndAssignment() {
 
         // Second iteration--second call to poll, partial consumer revocation
         workerTask.iteration();
-        verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION));
+        verify(sinkTask).close(singleton(TOPIC_PARTITION));
         verify(sinkTask, times(2)).put(Collections.emptyList());
 
         // Third iteration--third call to poll, partial consumer assignment
         workerTask.iteration();
-        verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3));
+        verify(sinkTask).open(singleton(TOPIC_PARTITION3));
         verify(sinkTask, times(3)).put(Collections.emptyList());
 
         // Fourth iteration--fourth call to poll, one partition lost; can't 
commit offsets for it, one new partition assigned
         workerTask.iteration();
-        verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION3));
-        verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION));
+        verify(sinkTask).close(singleton(TOPIC_PARTITION3));
+        verify(sinkTask).open(singleton(TOPIC_PARTITION));
         verify(sinkTask, times(4)).put(Collections.emptyList());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testTaskCancelPreventsFinalOffsetCommit() {
+        createTask(initialState);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectTaskGetTopic();
+        expectPollInitialAssignment()
+                // Put one message through the task to get some offsets to 
commit
+                .thenAnswer(expectConsumerPoll(1))
+                // the second put will return after the task is stopped and 
cancelled (asynchronously)
+                .thenAnswer(expectConsumerPoll(1));
+
+        expectConversionAndTransformation(null, new RecordHeaders());
+
+        doAnswer(invocation -> null)
+                .doAnswer(invocation -> null)
+                .doAnswer(invocation -> {
+                    workerTask.stop();
+                    workerTask.cancel();
+                    return null;
+                })
+                .when(sinkTask).put(anyList());
+
+        // task performs normal steps in advance of committing offsets
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
+        offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+        when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+
+        workerTask.execute();
+
+        // stop wakes up the consumer
+        verify(consumer).wakeup();
+
+        verify(sinkTask).close(any(Collection.class));
+    }
+
+    @Test
+    public void testDeliveryWithMutatingTransform() {
+        createTask(initialState);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectTaskGetTopic();
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1))
+                .thenAnswer(expectConsumerPoll(0));
+
+        expectConversionAndTransformation("newtopic_", new RecordHeaders());
+
+        workerTask.iteration(); // initial assignment
+
+        workerTask.iteration(); // first record delivered
+
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+        when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+
+        sinkTaskContext.getValue().requestCommit();
+        assertTrue(sinkTaskContext.getValue().isCommitRequested());
+
+        assertNotEquals(offsets, workerTask.lastCommittedOffsets());
+        workerTask.iteration(); // triggers the commit
+
+        ArgumentCaptor<OffsetCommitCallback> callback = 
ArgumentCaptor.forClass(OffsetCommitCallback.class);
+        verify(consumer).commitAsync(eq(offsets), callback.capture());
+
+        callback.getValue().onComplete(offsets, null);
+
+        assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should 
have been cleared
+        assertEquals(offsets, workerTask.lastCommittedOffsets());
+        assertEquals(0, workerTask.commitFailures());
+        assertEquals(1.0, 
metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), 
"batch-size-max"), 0.0001);
+    }
+
+    @Test
+    public void testMissingTimestampPropagation() {
+        createTask(initialState);
+        expectTaskGetTopic();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, 
TimestampType.CREATE_TIME, new RecordHeaders()));
+
+        expectConversionAndTransformation(null, new RecordHeaders());
+
+        workerTask.iteration(); // iter 1 -- initial assignment
+        workerTask.iteration(); // iter 2 -- deliver 1 record
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Collection<SinkRecord>> records = 
ArgumentCaptor.forClass(Collection.class);
+        verify(sinkTask, times(2)).put(records.capture());
+
+        SinkRecord record = records.getValue().iterator().next();
+
+        // we expect null for missing timestamp, the sentinel value of 
Record.NO_TIMESTAMP is Kafka's API
+        assertNull(record.timestamp());
+        assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+    }
+
+    @Test
+    public void testTimestampPropagation() {
+        final Long timestamp = System.currentTimeMillis();
+        final TimestampType timestampType = TimestampType.CREATE_TIME;
+
+        createTask(initialState);
+        expectTaskGetTopic();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1, timestamp, timestampType, 
new RecordHeaders()));
+
+        expectConversionAndTransformation(null, new RecordHeaders());
+
+        workerTask.iteration(); // iter 1 -- initial assignment
+        workerTask.iteration(); // iter 2 -- deliver 1 record
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Collection<SinkRecord>> records = 
ArgumentCaptor.forClass(Collection.class);
+        verify(sinkTask, times(2)).put(records.capture());
+
+        SinkRecord record = records.getValue().iterator().next();
+
+        assertEquals(timestamp, record.timestamp());
+        assertEquals(timestampType, record.timestampType());
+    }
+
+    @Test
+    public void testTopicsRegex() {
+        Map<String, String> props = new HashMap<>(TASK_PROPS);
+        props.remove("topics");
+        props.put("topics.regex", "te.*");
+        TaskConfig taskConfig = new TaskConfig(props);
+
+        createTask(TargetState.PAUSED);
+
+        workerTask.initialize(taskConfig);
+        workerTask.initializeAndStart();
+
+        ArgumentCaptor<Pattern> topicsRegex = 
ArgumentCaptor.forClass(Pattern.class);
+
+        verify(consumer).subscribe(topicsRegex.capture(), 
rebalanceListener.capture());

Review Comment:
   Could you add an assertion on the topicsRegex?
   ```suggestion
           verify(consumer).subscribe(topicsRegex.capture(), 
rebalanceListener.capture());
           assertEquals("te.*", topicsRegex.getValue().pattern());
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -162,7 +153,6 @@ public class WorkerSinkTaskTest {
     @Mock
     private ErrorHandlingMetrics errorHandlingMetrics;

Review Comment:
   Above here, taskId1 is not used



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##########
@@ -343,6 +354,103 @@ public void testShutdown() throws Exception {
         verify(headerConverter).close();
     }
 
+    @Test
+    public void testPollRedelivery() {
+        createTask(initialState);
+        expectTaskGetTopic();
+
+        when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT);
+        INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));

Review Comment:
   This is already in expectPollInitialAssignment, right?
   ```suggestion
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##########
@@ -558,6 +832,143 @@ public void testMetricsGroup() {
         assertEquals(30, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"put-batch-max-time-ms"), 0.001d);
     }
 
+    @Test
+    public void testHeaders() {
+        createTask(initialState);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        Headers headers = new RecordHeaders();
+        headers.add("header_key", "header_value".getBytes());
+
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1, headers));
+
+        expectConversionAndTransformation(null, headers);
+
+        workerTask.iteration(); // iter 1 -- initial assignment
+        workerTask.iteration(); // iter 2 -- deliver 1 record
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Collection<SinkRecord>> recordCapture = 
ArgumentCaptor.forClass(Collection.class);
+        verify(sinkTask, times(2)).put(recordCapture.capture());
+
+        assertEquals(1, recordCapture.getValue().size());
+        SinkRecord record = recordCapture.getValue().iterator().next();
+
+        assertEquals("header_value", 
record.headers().lastWithName("header_key").value());
+    }
+
+    @Test
+    public void testHeadersWithCustomConverter() {
+        StringConverter stringConverter = new StringConverter();
+        SampleConverterWithHeaders testConverter = new 
SampleConverterWithHeaders();
+
+        createTask(initialState, stringConverter, testConverter, 
stringConverter);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        String keyA = "a";
+        String valueA = "Árvíztűrő tükörfúrógép";
+        Headers headersA = new RecordHeaders();
+        String encodingA = "latin2";
+        headersA.add("encoding", encodingA.getBytes());
+
+        String keyB = "b";
+        String valueB = "Тестовое сообщение";
+        Headers headersB = new RecordHeaders();
+        String encodingB = "koi8_r";
+        headersB.add("encoding", encodingB.getBytes());
+
+        expectPollInitialAssignment()
+                .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) 
invocation -> {
+                    List<ConsumerRecord<byte[], byte[]>> records = 
Arrays.asList(
+                            new ConsumerRecord<>(TOPIC, PARTITION, 
FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, 
TimestampType.NO_TIMESTAMP_TYPE,
+                                    0, 0, keyA.getBytes(), 
valueA.getBytes(encodingA), headersA, Optional.empty()),
+                            new ConsumerRecord<>(TOPIC, PARTITION, 
FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, 
TimestampType.NO_TIMESTAMP_TYPE,
+                                    0, 0, keyB.getBytes(), 
valueB.getBytes(encodingB), headersB, Optional.empty())
+                    );
+                    return new ConsumerRecords<>(Collections.singletonMap(new 
TopicPartition(TOPIC, PARTITION), records));
+                });
+
+        expectTransformation(null);
+
+        workerTask.iteration(); // iter 1 -- initial assignment
+        workerTask.iteration(); // iter 2 -- deliver 1 record
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Collection<SinkRecord>> records = 
ArgumentCaptor.forClass(Collection.class);
+        verify(sinkTask, times(2)).put(records.capture());
+
+        Iterator<SinkRecord> iterator = records.getValue().iterator();
+
+        SinkRecord recordA = iterator.next();
+        assertEquals(keyA, recordA.key());
+        assertEquals(valueA, recordA.value());
+
+        SinkRecord recordB = iterator.next();
+        assertEquals(keyB, recordB.key());
+        assertEquals(valueB, recordB.value());
+    }
+
+    @Test
+    public void testOriginalTopicWithTopicMutatingTransformations() {
+        createTask(initialState);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1));
+
+        expectConversionAndTransformation("newtopic_", new RecordHeaders());
+
+        workerTask.iteration(); // initial assignment
+        workerTask.iteration(); // first record delivered
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Collection<SinkRecord>> recordCapture = 
ArgumentCaptor.forClass(Collection.class);
+        verify(sinkTask, times(2)).put(recordCapture.capture());
+
+        assertEquals(1, recordCapture.getValue().size());
+        SinkRecord record = recordCapture.getValue().iterator().next();
+        assertEquals(TOPIC, record.originalTopic());
+        assertEquals("newtopic_" + TOPIC, record.topic());
+    }
+
+    @Test
+    public void testPartitionCountInCaseOfPartitionRevocation() {
+        MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        // Setting up Worker Sink Task to check metrics
+        workerTask = new WorkerSinkTask(
+                taskId, sinkTask, statusListener, TargetState.PAUSED, 
workerConfig, ClusterConfigState.EMPTY, metrics,
+                keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
+                transformationChain, mockConsumer, pluginLoader, time,
+                RetryWithToleranceOperatorTest.noopOperator(), null, 
statusBackingStore, Collections::emptyList);
+        mockConsumer.updateBeginningOffsets(
+                new HashMap<TopicPartition, Long>() {{
+                    put(TOPIC_PARTITION, 0 * 1L);
+                    put(TOPIC_PARTITION2, 0 * 1L);

Review Comment:
   This is a warning in my IDE and the multiply doesn't seem to serve any 
purpose.
   ```suggestion
                       put(TOPIC_PARTITION, 0L);
                       put(TOPIC_PARTITION2, 0L);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to