rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431357309



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.sink;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;

Review comment:
       This leftover line should be removed.
   ```suggestion
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class InternalSinkRecord extends SinkRecord {
+
+    private final ConsumerRecord<byte[], byte[]> originalRecord;
+
+    public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, 
SinkRecord record) {
+        super(record.topic(), record.kafkaPartition(), record.keySchema(), 
record.key(),
+            record.valueSchema(), record.value(), record.kafkaOffset(), 
record.timestamp(),
+            record.timestampType(), record.headers());
+        this.originalRecord = originalRecord;
+    }

Review comment:
       We need to override the `newRecord(...)` that has all the parameters:
   ```suggestion
       public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, 
SinkRecord record) {
           this(originalRecord, record.topic(), record.kafkaPartition(), 
record.keySchema(), record.key(),
                   record.valueSchema(), record.value(), record.kafkaOffset(), 
record.timestamp(),
                   record.timestampType(), record.headers());
       }
   
       public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord,
           String topic, int partition, Schema keySchema, Object key, Schema 
valueSchema, Object value, long kafkaOffset,
           Long timestamp, TimestampType timestampType, Iterable<Header> headers
       ) {
           super(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, timestamp, timestampType, headers);
           this.originalRecord = originalRecord;
       }
   
       @Override
       public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value,
               Long timestamp, Iterable<Header> headers) {
           return new InternalSinkRecord(originalRecord, topic, kafkaPartition, 
keySchema, key,
                   valueSchema, value, kafkaOffset(), timestamp, 
timestampType(), headers());
       }
   
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
##########
@@ -28,6 +33,17 @@
      */
     void report(ProcessingContext context);
 
+    /**
+     * Report an error and return the producer future.
+     *
+     * @param context the processing context (cannot be null).
+     * @return future result from the producer sending a record to Kafka
+     */
+    default Future<RecordMetadata> reportAndReturnFuture(ProcessingContext 
context) {
+        report(context);
+        return CompletableFuture.completedFuture(null);
+    }
+

Review comment:
       This is an internal API, so why can we not just change the existing 
`report(...)` method to return `Future<?>`?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, 
record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic, 
record.valueSchema(),
+                record.value());

Review comment:
       ```suggestion
               byte[] value = valueConverter.fromConnectData(topic, 
record.valueSchema(), record.value());
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, 
record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic, 
record.valueSchema(),
+                record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = 
headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), 
record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), 
record.timestampType(), -1L, -1,
+                -1, key, value, headers);
+        }
+
+        Future<Void> future = 
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the 
errant
+     * record reporter. This function is intended to be used to block on all 
the errant record
+     * futures.
+     */
+    public void getAllFutures() {
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while calling ");
+                throw new ConnectException(e);
+            }
+        }
+        futures.clear();

Review comment:
       Let's use the queue-style access, since it saves us from having to clear 
the list and would work if we need it to be concurrent.
   ```suggestion
           Future<?> future = null;
           while ((future = futures.poll()) != null) {
               try {
                   future.get();
               } catch (InterruptedException | ExecutionException e) {
                   log.error("Encountered an error while calling ");
                   throw new ConnectException(e);
               }
           }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +505,21 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+
         log.trace("{} Applying transformations to record in topic '{}' 
partition {} at offset {} and timestamp {} with key {} and value {}",
                 this, msg.topic(), msg.partition(), msg.offset(), timestamp, 
keyAndSchema.value(), valueAndSchema.value());
         if (isTopicTrackingEnabled) {
             recordActiveTopic(origRecord.topic());
         }
-        return transformationChain.apply(origRecord);
+
+        // Apply the transformations
+        SinkRecord transformedRecord = transformationChain.apply(origRecord);
+

Review comment:
       Nit: let's remove this blank line, since there already are quite a few.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -360,6 +364,10 @@ private void doCommit(Map<TopicPartition, 
OffsetAndMetadata> offsets, boolean cl
     }
 
     private void commitOffsets(long now, boolean closing) {
+        if (workerErrantRecordReporter != null) {
+            workerErrantRecordReporter.getAllFutures();

Review comment:
       Let's add a trace log message before and after this call.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -21,11 +21,13 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+

Review comment:
       Nit: let's remove this blank line, since it's unrelated to other changes.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -111,6 +116,7 @@ public static DeadLetterQueueReporter 
createAndSetup(Map<String, Object> adminPr
         this.connConfig = connConfig;
         this.connectorTaskId = id;
         this.errorHandlingMetrics = errorHandlingMetrics;
+        this.dlqTopicName = connConfig.dlqTopicName();

Review comment:
       Should we trim this?
   ```suggestion
           this.dlqTopicName = connConfig.dlqTopicName().trim();
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        if (record instanceof InternalSinkRecord) {

Review comment:
       ```suggestion
           // Most of the records will be an internal sink record, but the task 
could potentially
           // report modified or new records, so handle both cases
           if (record instanceof InternalSinkRecord) {
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        ConsumerRecord<byte[], byte[]> consumerRecord;
+
+        if (record instanceof InternalSinkRecord) {
+            consumerRecord = ((InternalSinkRecord) record).originalRecord();
+        } else {
+            String topic = record.topic();
+            byte[] key = keyConverter.fromConnectData(topic, 
record.keySchema(), record.key());
+            byte[] value = valueConverter.fromConnectData(topic, 
record.valueSchema(),
+                record.value());
+
+            RecordHeaders headers = new RecordHeaders();
+            if (record.headers() != null) {
+                for (Header header : record.headers()) {
+                    String headerKey = header.key();
+                    byte[] rawHeader = 
headerConverter.fromConnectHeader(topic, headerKey,
+                        header.schema(), header.value());
+                    headers.add(headerKey, rawHeader);
+                }
+            }
+
+            consumerRecord = new ConsumerRecord<>(record.topic(), 
record.kafkaPartition(),
+                record.kafkaOffset(), record.timestamp(), 
record.timestampType(), -1L, -1,
+                -1, key, value, headers);
+        }
+
+        Future<Void> future = 
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, error);
+
+        if (!future.isDone()) {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the 
errant
+     * record reporter. This function is intended to be used to block on all 
the errant record
+     * futures.
+     */
+    public void getAllFutures() {

Review comment:
       Let's rename this to `awaitAllFutures()` since this really is not a 
getter method.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +505,21 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+

Review comment:
       Nit: let's remove this blank line, since it's unrelated to other changes.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;

Review comment:
       This can be package protected and final:
   ```suggestion
       final LinkedList<Future<Void>> futures;
   ```
   

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ErrantRecordSinkConnector extends MonitorableSinkConnector {
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return ErrantRecordSinkTask.class;
+    }
+
+    public static class ErrantRecordSinkTask extends MonitorableSinkTask {
+        private ErrantRecordReporter reporter;
+
+        public ErrantRecordSinkTask() {
+            super();
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            super.start(props);
+            try {
+                reporter = context.errantRecordReporter(); // may be null if 
DLQ not enabled
+            } catch (NoClassDefFoundError e) {
+                // Will occur in Connect runtimes earlier than 2.6

Review comment:
       @aakashnshah let's remove this comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to