This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new 2af214a KAFKA-7228: Set errorHandlingMetrics for dead letter queue 2af214a is described below commit 2af214a51cef984056f9cb403b8541945082238d Author: Arjun Satish <ar...@confluent.io> AuthorDate: Thu Aug 2 14:36:02 2018 -0700 KAFKA-7228: Set errorHandlingMetrics for dead letter queue DLQ reporter does not get a `errorHandlingMetrics` object when created by the worker. This results in an NPE. Signed-off-by: Arjun Satish <arjunconfluent.io> *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* Author: Arjun Satish <ar...@confluent.io> Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Ewen Cheslack-Postava <e...@confluent.io> Closes #5440 from wicknicks/KAFKA-7228 (cherry picked from commit 70d882861e1bf3eb503c84a31834e8b628de2df9) Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org> --- .../org/apache/kafka/connect/runtime/Worker.java | 8 ++---- .../runtime/errors/DeadLetterQueueReporter.java | 20 +++++++------ .../connect/runtime/errors/ErrorReporter.java | 8 ------ .../kafka/connect/runtime/errors/LogReporter.java | 15 +++++----- .../connect/runtime/ErrorHandlingTaskTest.java | 9 ++---- .../connect/runtime/errors/ErrorReporterTest.java | 33 +++++++++++----------- 6 files changed, 43 insertions(+), 50 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 7291d4f..1096584 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -523,14 +523,13 @@ public class Worker { private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, SinkConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics) { ArrayList<ErrorReporter> reporters = new ArrayList<>(); - LogReporter logReporter = new LogReporter(id, connConfig); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics); reporters.add(logReporter); // check if topic for dead letter queue exists String topic = connConfig.dlqTopicName(); if (topic != null && !topic.isEmpty()) { - DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps); + DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps, errorHandlingMetrics); reporters.add(reporter); } @@ -540,8 +539,7 @@ public class Worker { private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics) { List<ErrorReporter> reporters = new ArrayList<>(); - LogReporter logReporter = new LogReporter(id, connConfig); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics); reporters.add(logReporter); return reporters; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index d36ec22..c059dcf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; import static java.util.Collections.singleton; @@ -66,13 +67,14 @@ public class DeadLetterQueueReporter implements ErrorReporter { private final SinkConnectorConfig connConfig; private final ConnectorTaskId connectorTaskId; + private final ErrorHandlingMetrics errorHandlingMetrics; private KafkaProducer<byte[], byte[]> kafkaProducer; - private ErrorHandlingMetrics errorHandlingMetrics; public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, ConnectorTaskId id, - SinkConnectorConfig sinkConfig, Map<String, Object> producerProps) { + SinkConnectorConfig sinkConfig, Map<String, Object> producerProps, + ErrorHandlingMetrics errorHandlingMetrics) { String topic = sinkConfig.dlqTopicName(); try (AdminClient admin = AdminClient.create(workerConfig.originals())) { @@ -90,7 +92,7 @@ public class DeadLetterQueueReporter implements ErrorReporter { } KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps); - return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id); + return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, errorHandlingMetrics); } /** @@ -99,14 +101,16 @@ public class DeadLetterQueueReporter implements ErrorReporter { * @param kafkaProducer a Kafka Producer to produce the original consumed records. */ // Visible for testing - DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig, ConnectorTaskId id) { + DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig, + ConnectorTaskId id, ErrorHandlingMetrics errorHandlingMetrics) { + Objects.requireNonNull(kafkaProducer); + Objects.requireNonNull(connConfig); + Objects.requireNonNull(id); + Objects.requireNonNull(errorHandlingMetrics); + this.kafkaProducer = kafkaProducer; this.connConfig = connConfig; this.connectorTaskId = id; - } - - @Override - public void metrics(ErrorHandlingMetrics errorHandlingMetrics) { this.errorHandlingMetrics = errorHandlingMetrics; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java index f7df1b2..5833616 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java @@ -28,12 +28,4 @@ public interface ErrorReporter { */ void report(ProcessingContext context); - /** - * Provides the container for error handling metrics to implementations. This method will be called once the error - * reporter object is instantiated. - * - * @param errorHandlingMetrics metrics for error handling (cannot be null). - */ - void metrics(ErrorHandlingMetrics errorHandlingMetrics); - } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java index e81bd54..8b07adf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java @@ -21,6 +21,8 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; + /** * Writes errors and their context to application logs. */ @@ -30,12 +32,16 @@ public class LogReporter implements ErrorReporter { private final ConnectorTaskId id; private final ConnectorConfig connConfig; + private final ErrorHandlingMetrics errorHandlingMetrics; - private ErrorHandlingMetrics errorHandlingMetrics; + public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics) { + Objects.requireNonNull(id); + Objects.requireNonNull(connConfig); + Objects.requireNonNull(errorHandlingMetrics); - public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig) { this.id = id; this.connConfig = connConfig; + this.errorHandlingMetrics = errorHandlingMetrics; } /** @@ -57,11 +63,6 @@ public class LogReporter implements ErrorReporter { errorHandlingMetrics.recordErrorLogged(); } - @Override - public void metrics(ErrorHandlingMetrics errorHandlingMetrics) { - this.errorHandlingMetrics = errorHandlingMetrics; - } - // Visible for testing String message(ProcessingContext context) { return String.format("Error encountered in task %s. %s", String.valueOf(id), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index e931642..1bf9c71 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -166,8 +166,7 @@ public class ErrorHandlingTaskTest { Map<String, String> reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); - LogReporter reporter = new LogReporter(taskId, connConfig(reportProps)); - reporter.metrics(errorHandlingMetrics); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); RetryWithToleranceOperator retryWithToleranceOperator = operator(); retryWithToleranceOperator.metrics(errorHandlingMetrics); @@ -218,8 +217,7 @@ public class ErrorHandlingTaskTest { Map<String, String> reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); - LogReporter reporter = new LogReporter(taskId, connConfig(reportProps)); - reporter.metrics(errorHandlingMetrics); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); RetryWithToleranceOperator retryWithToleranceOperator = operator(); retryWithToleranceOperator.metrics(errorHandlingMetrics); @@ -283,8 +281,7 @@ public class ErrorHandlingTaskTest { Map<String, String> reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); - LogReporter reporter = new LogReporter(taskId, connConfig(reportProps)); - reporter.metrics(errorHandlingMetrics); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); RetryWithToleranceOperator retryWithToleranceOperator = operator(); retryWithToleranceOperator.metrics(errorHandlingMetrics); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java index f199982..fa628b0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java @@ -94,10 +94,15 @@ public class ErrorReporterTest { } } + @Test(expected = NullPointerException.class) + public void initializeDLQWithNullMetrics() { + new DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID, null); + } + @Test public void testDLQConfigWithEmptyTopicName() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID); - deadLetterQueueReporter.metrics(errorHandlingMetrics); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter( + producer, config(emptyMap()), TASK_ID, errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -111,8 +116,8 @@ public class ErrorReporterTest { @Test public void testDLQConfigWithValidTopicName() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID); - deadLetterQueueReporter.metrics(errorHandlingMetrics); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter( + producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -126,8 +131,8 @@ public class ErrorReporterTest { @Test public void testReportDLQTwice() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID); - deadLetterQueueReporter.metrics(errorHandlingMetrics); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter( + producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -142,8 +147,7 @@ public class ErrorReporterTest { @Test public void testLogOnDisabledLogReporter() { - LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap())); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()), errorHandlingMetrics); ProcessingContext context = processingContext(); context.error(new RuntimeException()); @@ -155,8 +159,7 @@ public class ErrorReporterTest { @Test public void testLogOnEnabledLogReporter() { - LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"))); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")), errorHandlingMetrics); ProcessingContext context = processingContext(); context.error(new RuntimeException()); @@ -168,8 +171,7 @@ public class ErrorReporterTest { @Test public void testLogMessageWithNoRecords() { - LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"))); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")), errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -184,8 +186,7 @@ public class ErrorReporterTest { props.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); props.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); - LogReporter logReporter = new LogReporter(TASK_ID, config(props)); - logReporter.metrics(errorHandlingMetrics); + LogReporter logReporter = new LogReporter(TASK_ID, config(props), errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -208,7 +209,7 @@ public class ErrorReporterTest { Map<String, String> props = new HashMap<>(); props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics); ProcessingContext context = new ProcessingContext(); context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes())); @@ -236,7 +237,7 @@ public class ErrorReporterTest { Map<String, String> props = new HashMap<>(); props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics); ProcessingContext context = new ProcessingContext(); context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes()));