This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 64ebc76df9 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415) 64ebc76df9 is described below commit 64ebc76df96cdc9cea419c0ab09cfa82d0ca743c Author: Christopher L. Shannon <christopher.l.shan...@gmail.com> AuthorDate: Mon Jul 18 18:07:20 2022 -0400 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415) Make sure to ack all records where produce failed, when a connector's `errors.tolerance` config property is set to `all`. Acking is essential so that the task will continue to commit future record offsets properly and remove the records from internal tracking, preventing a memory leak. (cherry picked and slightly modified from commit 63e06aafd0cf37f8488c3830946051b3a30db2a0) Reviewers: Chris Egerton <fearthecel...@gmail.com>, Randall Hauch <rha...@gmail.com> --- .../connect/runtime/AbstractWorkerSourceTask.java | 5 +++++ .../kafka/connect/runtime/WorkerSourceTaskTest.java | 21 ++++++++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index d89f577688..407f5fd828 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -38,6 +38,7 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; +import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; @@ -406,6 +407,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { } log.trace("{} Failed record: {}", AbstractWorkerSourceTask.this, preTransformRecord); producerSendFailed(false, producerRecord, preTransformRecord, e); + if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) { + counter.skipRecord(); + submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::ack); + } } else { counter.completeRecord(); log.trace("{} Wrote record successfully: topic {} partition {} offset {}", diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 5ce0e44f3e..2d2cd00cf5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -696,24 +696,39 @@ public class WorkerSourceTaskTest extends ThreadedTest { createWorkerTaskWithErrorToleration(); expectTopicCreation(TOPIC); + //Use different offsets for each record so we can verify all were committed + final Map<String, Object> offset2 = Collections.singletonMap("key", 13); + // send two records // record 1 will succeed // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator // and no ConnectException will be thrown SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - + SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + expectOffsetFlush(true); expectSendRecordOnce(); expectSendRecordProducerCallbackFail(); sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull()); - EasyMock.expectLastCall(); + + //As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored) + //Only the last offset will be passed to the method as everything up to that point is committed + //Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked + offsetWriter.offset(PARTITION, offset2); + PowerMock.expectLastCall(); PowerMock.replayAll(); + //Send records and then commit offsets and verify both were committed and no exception Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); Whitebox.invokeMethod(workerTask, "sendRecords"); + Whitebox.invokeMethod(workerTask, "updateCommittableOffsets"); + workerTask.commitOffsets(); PowerMock.verifyAll(); + + //Double check to make sure all submitted records were cleared + assertEquals(0, ((SubmittedRecords) Whitebox.getInternalState(workerTask, + "submittedRecords")).records.size()); } @Test