This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 2af214a  KAFKA-7228: Set errorHandlingMetrics for dead letter queue
2af214a is described below

commit 2af214a51cef984056f9cb403b8541945082238d
Author: Arjun Satish <ar...@confluent.io>
AuthorDate: Thu Aug 2 14:36:02 2018 -0700

    KAFKA-7228: Set errorHandlingMetrics for dead letter queue
    
    DLQ reporter does not get a `errorHandlingMetrics` object when created by 
the worker. This results in an NPE.
    
    Signed-off-by: Arjun Satish <arjunconfluent.io>
    
    *More detailed description of your change,
    if necessary. The PR title and PR message become
    the squashed commit message, so use a separate
    comment to ping reviewers.*
    
    *Summary of testing strategy (including rationale)
    for the feature or bug fix. Unit and/or integration
    tests are expected for any behaviour change and
    system tests should be considered for larger changes.*
    
    Author: Arjun Satish <ar...@confluent.io>
    
    Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Ewen 
Cheslack-Postava <e...@confluent.io>
    
    Closes #5440 from wicknicks/KAFKA-7228
    
    (cherry picked from commit 70d882861e1bf3eb503c84a31834e8b628de2df9)
    Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |  8 ++----
 .../runtime/errors/DeadLetterQueueReporter.java    | 20 +++++++------
 .../connect/runtime/errors/ErrorReporter.java      |  8 ------
 .../kafka/connect/runtime/errors/LogReporter.java  | 15 +++++-----
 .../connect/runtime/ErrorHandlingTaskTest.java     |  9 ++----
 .../connect/runtime/errors/ErrorReporterTest.java  | 33 +++++++++++-----------
 6 files changed, 43 insertions(+), 50 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 7291d4f..1096584 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -523,14 +523,13 @@ public class Worker {
     private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, 
SinkConnectorConfig connConfig,
                                                   ErrorHandlingMetrics 
errorHandlingMetrics) {
         ArrayList<ErrorReporter> reporters = new ArrayList<>();
-        LogReporter logReporter = new LogReporter(id, connConfig);
-        logReporter.metrics(errorHandlingMetrics);
+        LogReporter logReporter = new LogReporter(id, connConfig, 
errorHandlingMetrics);
         reporters.add(logReporter);
 
         // check if topic for dead letter queue exists
         String topic = connConfig.dlqTopicName();
         if (topic != null && !topic.isEmpty()) {
-            DeadLetterQueueReporter reporter = 
DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps);
+            DeadLetterQueueReporter reporter = 
DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps, 
errorHandlingMetrics);
             reporters.add(reporter);
         }
 
@@ -540,8 +539,7 @@ public class Worker {
     private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, 
ConnectorConfig connConfig,
                                                       ErrorHandlingMetrics 
errorHandlingMetrics) {
         List<ErrorReporter> reporters = new ArrayList<>();
-        LogReporter logReporter = new LogReporter(id, connConfig);
-        logReporter.metrics(errorHandlingMetrics);
+        LogReporter logReporter = new LogReporter(id, connConfig, 
errorHandlingMetrics);
         reporters.add(logReporter);
 
         return reporters;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index d36ec22..c059dcf 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 
 import static java.util.Collections.singleton;
@@ -66,13 +67,14 @@ public class DeadLetterQueueReporter implements 
ErrorReporter {
 
     private final SinkConnectorConfig connConfig;
     private final ConnectorTaskId connectorTaskId;
+    private final ErrorHandlingMetrics errorHandlingMetrics;
 
     private KafkaProducer<byte[], byte[]> kafkaProducer;
-    private ErrorHandlingMetrics errorHandlingMetrics;
 
     public static DeadLetterQueueReporter createAndSetup(WorkerConfig 
workerConfig,
                                                          ConnectorTaskId id,
-                                                         SinkConnectorConfig 
sinkConfig, Map<String, Object> producerProps) {
+                                                         SinkConnectorConfig 
sinkConfig, Map<String, Object> producerProps,
+                                                         ErrorHandlingMetrics 
errorHandlingMetrics) {
         String topic = sinkConfig.dlqTopicName();
 
         try (AdminClient admin = AdminClient.create(workerConfig.originals())) 
{
@@ -90,7 +92,7 @@ public class DeadLetterQueueReporter implements ErrorReporter 
{
         }
 
         KafkaProducer<byte[], byte[]> dlqProducer = new 
KafkaProducer<>(producerProps);
-        return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id);
+        return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, 
errorHandlingMetrics);
     }
 
     /**
@@ -99,14 +101,16 @@ public class DeadLetterQueueReporter implements 
ErrorReporter {
      * @param kafkaProducer a Kafka Producer to produce the original consumed 
records.
      */
     // Visible for testing
-    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, 
SinkConnectorConfig connConfig, ConnectorTaskId id) {
+    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, 
SinkConnectorConfig connConfig,
+                            ConnectorTaskId id, ErrorHandlingMetrics 
errorHandlingMetrics) {
+        Objects.requireNonNull(kafkaProducer);
+        Objects.requireNonNull(connConfig);
+        Objects.requireNonNull(id);
+        Objects.requireNonNull(errorHandlingMetrics);
+
         this.kafkaProducer = kafkaProducer;
         this.connConfig = connConfig;
         this.connectorTaskId = id;
-    }
-
-    @Override
-    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
         this.errorHandlingMetrics = errorHandlingMetrics;
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
index f7df1b2..5833616 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -28,12 +28,4 @@ public interface ErrorReporter {
      */
     void report(ProcessingContext context);
 
-    /**
-     * Provides the container for error handling metrics to implementations. 
This method will be called once the error
-     * reporter object is instantiated.
-     *
-     * @param errorHandlingMetrics metrics for error handling (cannot be null).
-     */
-    void metrics(ErrorHandlingMetrics errorHandlingMetrics);
-
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
index e81bd54..8b07adf 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
@@ -21,6 +21,8 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
+
 /**
  * Writes errors and their context to application logs.
  */
@@ -30,12 +32,16 @@ public class LogReporter implements ErrorReporter {
 
     private final ConnectorTaskId id;
     private final ConnectorConfig connConfig;
+    private final ErrorHandlingMetrics errorHandlingMetrics;
 
-    private ErrorHandlingMetrics errorHandlingMetrics;
+    public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig, 
ErrorHandlingMetrics errorHandlingMetrics) {
+        Objects.requireNonNull(id);
+        Objects.requireNonNull(connConfig);
+        Objects.requireNonNull(errorHandlingMetrics);
 
-    public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig) {
         this.id = id;
         this.connConfig = connConfig;
+        this.errorHandlingMetrics = errorHandlingMetrics;
     }
 
     /**
@@ -57,11 +63,6 @@ public class LogReporter implements ErrorReporter {
         errorHandlingMetrics.recordErrorLogged();
     }
 
-    @Override
-    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
-        this.errorHandlingMetrics = errorHandlingMetrics;
-    }
-
     // Visible for testing
     String message(ProcessingContext context) {
         return String.format("Error encountered in task %s. %s", 
String.valueOf(id),
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index e931642..1bf9c71 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -166,8 +166,7 @@ public class ErrorHandlingTaskTest {
         Map<String, String> reportProps = new HashMap<>();
         reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
         reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, 
"true");
-        LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps));
-        reporter.metrics(errorHandlingMetrics);
+        LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps), errorHandlingMetrics);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
@@ -218,8 +217,7 @@ public class ErrorHandlingTaskTest {
         Map<String, String> reportProps = new HashMap<>();
         reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
         reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, 
"true");
-        LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps));
-        reporter.metrics(errorHandlingMetrics);
+        LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps), errorHandlingMetrics);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
@@ -283,8 +281,7 @@ public class ErrorHandlingTaskTest {
         Map<String, String> reportProps = new HashMap<>();
         reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
         reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, 
"true");
-        LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps));
-        reporter.metrics(errorHandlingMetrics);
+        LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps), errorHandlingMetrics);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index f199982..fa628b0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -94,10 +94,15 @@ public class ErrorReporterTest {
         }
     }
 
+    @Test(expected = NullPointerException.class)
+    public void initializeDLQWithNullMetrics() {
+        new DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID, 
null);
+    }
+
     @Test
     public void testDLQConfigWithEmptyTopicName() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID);
-        deadLetterQueueReporter.metrics(errorHandlingMetrics);
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(
+                producer, config(emptyMap()), TASK_ID, errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
 
@@ -111,8 +116,8 @@ public class ErrorReporterTest {
 
     @Test
     public void testDLQConfigWithValidTopicName() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), 
TASK_ID);
-        deadLetterQueueReporter.metrics(errorHandlingMetrics);
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(
+                producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), 
TASK_ID, errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
 
@@ -126,8 +131,8 @@ public class ErrorReporterTest {
 
     @Test
     public void testReportDLQTwice() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), 
TASK_ID);
-        deadLetterQueueReporter.metrics(errorHandlingMetrics);
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(
+                producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), 
TASK_ID, errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
 
@@ -142,8 +147,7 @@ public class ErrorReporterTest {
 
     @Test
     public void testLogOnDisabledLogReporter() {
-        LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()));
-        logReporter.metrics(errorHandlingMetrics);
+        LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()), 
errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
         context.error(new RuntimeException());
@@ -155,8 +159,7 @@ public class ErrorReporterTest {
 
     @Test
     public void testLogOnEnabledLogReporter() {
-        LogReporter logReporter = new LogReporter(TASK_ID, 
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")));
-        logReporter.metrics(errorHandlingMetrics);
+        LogReporter logReporter = new LogReporter(TASK_ID, 
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")), 
errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
         context.error(new RuntimeException());
@@ -168,8 +171,7 @@ public class ErrorReporterTest {
 
     @Test
     public void testLogMessageWithNoRecords() {
-        LogReporter logReporter = new LogReporter(TASK_ID, 
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")));
-        logReporter.metrics(errorHandlingMetrics);
+        LogReporter logReporter = new LogReporter(TASK_ID, 
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")), 
errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
 
@@ -184,8 +186,7 @@ public class ErrorReporterTest {
         props.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
         props.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
 
-        LogReporter logReporter = new LogReporter(TASK_ID, config(props));
-        logReporter.metrics(errorHandlingMetrics);
+        LogReporter logReporter = new LogReporter(TASK_ID, config(props), 
errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
 
@@ -208,7 +209,7 @@ public class ErrorReporterTest {
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
         props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, 
"true");
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(props), TASK_ID);
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics);
 
         ProcessingContext context = new ProcessingContext();
         context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, 
"source-key".getBytes(), "source-value".getBytes()));
@@ -236,7 +237,7 @@ public class ErrorReporterTest {
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
         props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, 
"true");
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(props), TASK_ID);
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics);
 
         ProcessingContext context = new ProcessingContext();
         context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, 
"source-key".getBytes(), "source-value".getBytes()));

Reply via email to