[ https://issues.apache.org/jira/browse/KAFKA-7228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567511#comment-16567511 ]
ASF GitHub Bot commented on KAFKA-7228: --------------------------------------- ewencp closed pull request #5440: KAFKA-7228: Set errorHandlingMetrics for dead letter queue URL: https://github.com/apache/kafka/pull/5440 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 1f62103213c..65d85caf90e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -524,14 +524,13 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) { private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, SinkConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics) { ArrayList<ErrorReporter> reporters = new ArrayList<>(); - LogReporter logReporter = new LogReporter(id, connConfig); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics); reporters.add(logReporter); // check if topic for dead letter queue exists String topic = connConfig.dlqTopicName(); if (topic != null && !topic.isEmpty()) { - DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps); + DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps, errorHandlingMetrics); reporters.add(reporter); } @@ -541,8 +540,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) { private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics) { List<ErrorReporter> reporters = new ArrayList<>(); - LogReporter logReporter = new LogReporter(id, connConfig); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics); reporters.add(logReporter); return reporters; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index d36ec22ec88..c059dcff793 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -36,6 +36,7 @@ import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; import static java.util.Collections.singleton; @@ -66,13 +67,14 @@ private final SinkConnectorConfig connConfig; private final ConnectorTaskId connectorTaskId; + private final ErrorHandlingMetrics errorHandlingMetrics; private KafkaProducer<byte[], byte[]> kafkaProducer; - private ErrorHandlingMetrics errorHandlingMetrics; public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, ConnectorTaskId id, - SinkConnectorConfig sinkConfig, Map<String, Object> producerProps) { + SinkConnectorConfig sinkConfig, Map<String, Object> producerProps, + ErrorHandlingMetrics errorHandlingMetrics) { String topic = sinkConfig.dlqTopicName(); try (AdminClient admin = AdminClient.create(workerConfig.originals())) { @@ -90,7 +92,7 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, } KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps); - return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id); + return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, errorHandlingMetrics); } /** @@ -99,14 +101,16 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, * @param kafkaProducer a Kafka Producer to produce the original consumed records. */ // Visible for testing - DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig, ConnectorTaskId id) { + DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig, + ConnectorTaskId id, ErrorHandlingMetrics errorHandlingMetrics) { + Objects.requireNonNull(kafkaProducer); + Objects.requireNonNull(connConfig); + Objects.requireNonNull(id); + Objects.requireNonNull(errorHandlingMetrics); + this.kafkaProducer = kafkaProducer; this.connConfig = connConfig; this.connectorTaskId = id; - } - - @Override - public void metrics(ErrorHandlingMetrics errorHandlingMetrics) { this.errorHandlingMetrics = errorHandlingMetrics; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java index f7df1b2d1a3..58336163fbf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java @@ -28,12 +28,4 @@ */ void report(ProcessingContext context); - /** - * Provides the container for error handling metrics to implementations. This method will be called once the error - * reporter object is instantiated. - * - * @param errorHandlingMetrics metrics for error handling (cannot be null). - */ - void metrics(ErrorHandlingMetrics errorHandlingMetrics); - } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java index e81bd547568..8b07adf8e49 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java @@ -21,6 +21,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; + /** * Writes errors and their context to application logs. */ @@ -30,12 +32,16 @@ private final ConnectorTaskId id; private final ConnectorConfig connConfig; + private final ErrorHandlingMetrics errorHandlingMetrics; - private ErrorHandlingMetrics errorHandlingMetrics; + public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics) { + Objects.requireNonNull(id); + Objects.requireNonNull(connConfig); + Objects.requireNonNull(errorHandlingMetrics); - public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig) { this.id = id; this.connConfig = connConfig; + this.errorHandlingMetrics = errorHandlingMetrics; } /** @@ -57,11 +63,6 @@ public void report(ProcessingContext context) { errorHandlingMetrics.recordErrorLogged(); } - @Override - public void metrics(ErrorHandlingMetrics errorHandlingMetrics) { - this.errorHandlingMetrics = errorHandlingMetrics; - } - // Visible for testing String message(ProcessingContext context) { return String.format("Error encountered in task %s. %s", String.valueOf(id), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index e931642afcc..1bf9c717068 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -166,8 +166,7 @@ public void testErrorHandlingInSinkTasks() throws Exception { Map<String, String> reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); - LogReporter reporter = new LogReporter(taskId, connConfig(reportProps)); - reporter.metrics(errorHandlingMetrics); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); RetryWithToleranceOperator retryWithToleranceOperator = operator(); retryWithToleranceOperator.metrics(errorHandlingMetrics); @@ -218,8 +217,7 @@ public void testErrorHandlingInSourceTasks() throws Exception { Map<String, String> reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); - LogReporter reporter = new LogReporter(taskId, connConfig(reportProps)); - reporter.metrics(errorHandlingMetrics); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); RetryWithToleranceOperator retryWithToleranceOperator = operator(); retryWithToleranceOperator.metrics(errorHandlingMetrics); @@ -283,8 +281,7 @@ public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception { Map<String, String> reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); - LogReporter reporter = new LogReporter(taskId, connConfig(reportProps)); - reporter.metrics(errorHandlingMetrics); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); RetryWithToleranceOperator retryWithToleranceOperator = operator(); retryWithToleranceOperator.metrics(errorHandlingMetrics); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java index f199982231f..fa628b09840 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java @@ -94,10 +94,15 @@ public void tearDown() { } } + @Test(expected = NullPointerException.class) + public void initializeDLQWithNullMetrics() { + new DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID, null); + } + @Test public void testDLQConfigWithEmptyTopicName() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID); - deadLetterQueueReporter.metrics(errorHandlingMetrics); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter( + producer, config(emptyMap()), TASK_ID, errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -111,8 +116,8 @@ public void testDLQConfigWithEmptyTopicName() { @Test public void testDLQConfigWithValidTopicName() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID); - deadLetterQueueReporter.metrics(errorHandlingMetrics); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter( + producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -126,8 +131,8 @@ public void testDLQConfigWithValidTopicName() { @Test public void testReportDLQTwice() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID); - deadLetterQueueReporter.metrics(errorHandlingMetrics); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter( + producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -142,8 +147,7 @@ public void testReportDLQTwice() { @Test public void testLogOnDisabledLogReporter() { - LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap())); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()), errorHandlingMetrics); ProcessingContext context = processingContext(); context.error(new RuntimeException()); @@ -155,8 +159,7 @@ public void testLogOnDisabledLogReporter() { @Test public void testLogOnEnabledLogReporter() { - LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"))); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")), errorHandlingMetrics); ProcessingContext context = processingContext(); context.error(new RuntimeException()); @@ -168,8 +171,7 @@ public void testLogOnEnabledLogReporter() { @Test public void testLogMessageWithNoRecords() { - LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"))); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")), errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -184,8 +186,7 @@ public void testLogMessageWithSinkRecords() { props.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); props.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); - LogReporter logReporter = new LogReporter(TASK_ID, config(props)); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(TASK_ID, config(props), errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -208,7 +209,7 @@ public void testDlqHeaderConsumerRecord() { Map<String, String> props = new HashMap<>(); props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics); ProcessingContext context = new ProcessingContext(); context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes())); @@ -236,7 +237,7 @@ public void testDlqHeaderIsAppended() { Map<String, String> props = new HashMap<>(); props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics); ProcessingContext context = new ProcessingContext(); context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes())); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > DeadLetterQueue throws a NullPointerException > --------------------------------------------- > > Key: KAFKA-7228 > URL: https://issues.apache.org/jira/browse/KAFKA-7228 > Project: Kafka > Issue Type: Task > Components: KafkaConnect > Reporter: Arjun Satish > Assignee: Arjun Satish > Priority: Major > Fix For: 2.0.1, 2.1.0 > > > Using the dead letter queue results in a NPE: > {code:java} > [2018-08-01 07:41:08,907] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:124) > at > org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2018-08-01 07:41:08,908] ERROR WorkerSinkTask{id=s3-sink-yanivr-2-4} Task is > being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > DLQ reporter does not get a {{errorHandlingMetrics}} object when initialized > through the WorkerSinkTask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)