This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 22f1724 KAFKA-7434: Fix NPE in DeadLetterQueueReporter 22f1724 is described below commit 22f1724123c267352116c18db1abdee25c31b382 Author: Michał Borowiecki <mbo...@gmail.com> AuthorDate: Sat Sep 29 10:19:10 2018 -0700 KAFKA-7434: Fix NPE in DeadLetterQueueReporter *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* Author: Michał Borowiecki <mbo...@gmail.com> Reviewers: Arjun Satish <ar...@confluent.io>, Ewen Cheslack-Postava <e...@confluent.io> Closes #5700 from mihbor/KAFKA-7434 --- .../runtime/errors/DeadLetterQueueReporter.java | 6 ++++- .../connect/runtime/errors/ErrorReporterTest.java | 30 ++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index c059dcf..2312269 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -199,6 +199,10 @@ public class DeadLetterQueueReporter implements ErrorReporter { } private byte[] toBytes(String value) { - return value.getBytes(StandardCharsets.UTF_8); + if (value != null) { + return value.getBytes(StandardCharsets.UTF_8); + } else { + return null; + } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java index fa628b0..00a922f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java @@ -59,6 +59,7 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ER import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID; import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @@ -205,6 +206,7 @@ public class ErrorReporterTest { assertEquals(configuration.dlqTopicReplicationFactor(), 7); } + @Test public void testDlqHeaderConsumerRecord() { Map<String, String> props = new HashMap<>(); props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); @@ -233,6 +235,34 @@ public class ErrorReporterTest { } @Test + public void testDlqHeaderOnNullExceptionMessage() { + Map<String, String> props = new HashMap<>(); + props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); + props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics); + + ProcessingContext context = new ProcessingContext(); + context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes())); + context.currentContext(Stage.TRANSFORMATION, Transformation.class); + context.error(new NullPointerException()); + + ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes()); + + deadLetterQueueReporter.populateContextHeaders(producerRecord, context); + assertEquals("source-topic", headerValue(producerRecord, ERROR_HEADER_ORIG_TOPIC)); + assertEquals("7", headerValue(producerRecord, ERROR_HEADER_ORIG_PARTITION)); + assertEquals("10", headerValue(producerRecord, ERROR_HEADER_ORIG_OFFSET)); + assertEquals(TASK_ID.connector(), headerValue(producerRecord, ERROR_HEADER_CONNECTOR_NAME)); + assertEquals(String.valueOf(TASK_ID.task()), headerValue(producerRecord, ERROR_HEADER_TASK_ID)); + assertEquals(Stage.TRANSFORMATION.name(), headerValue(producerRecord, ERROR_HEADER_STAGE)); + assertEquals(Transformation.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXECUTING_CLASS)); + assertEquals(NullPointerException.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXCEPTION)); + assertNull(producerRecord.headers().lastHeader(ERROR_HEADER_EXCEPTION_MESSAGE).value()); + assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).length() > 0); + assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("java.lang.NullPointerException")); + } + + @Test public void testDlqHeaderIsAppended() { Map<String, String> props = new HashMap<>(); props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);