This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 266c367  allow ReplaceField SMT to handle tombstone records (#7731)
266c367 is described below

commit 266c367e6c83faabb9a8df956e5f97d5812a4426
Author: Lev Zemlyanov <lzeml...@purdue.edu>
AuthorDate: Wed Feb 12 14:36:06 2020 -0800

    allow ReplaceField SMT to handle tombstone records (#7731)
    
    Signed-off-by: Lev Zemlyanov <l...@confluent.io>
---
 .../kafka/connect/transforms/ReplaceField.java     |  4 ++-
 .../kafka/connect/transforms/ReplaceFieldTest.java | 38 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index f071bda..3d9abc2 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -123,7 +123,9 @@ public abstract class ReplaceField<R extends 
ConnectRecord<R>> implements Transf
 
     @Override
     public R apply(R record) {
-        if (operatingSchema(record) == null) {
+        if (operatingValue(record) == null) {
+            return record;
+        } else if (operatingSchema(record) == null) {
             return applySchemaless(record);
         } else {
             return applyWithSchema(record);
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index 6a1a13a..7ab00ed 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class ReplaceFieldTest {
     private ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
@@ -37,6 +38,43 @@ public class ReplaceFieldTest {
     }
 
     @Test
+    public void tombstoneSchemaless() {
+        final Map<String, String> props = new HashMap<>();
+        props.put("whitelist", "abc,foo");
+        props.put("renames", "abc:xyz,foo:bar");
+
+        xform.configure(props);
+
+        final SinkRecord record = new SinkRecord("test", 0, null, null, null, 
null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertNull(transformedRecord.value());
+        assertNull(transformedRecord.valueSchema());
+    }
+
+    @Test
+    public void tombstoneWithSchema() {
+        final Map<String, String> props = new HashMap<>();
+        props.put("whitelist", "abc,foo");
+        props.put("renames", "abc:xyz,foo:bar");
+
+        xform.configure(props);
+
+        final Schema schema = SchemaBuilder.struct()
+            .field("dont", Schema.STRING_SCHEMA)
+            .field("abc", Schema.INT32_SCHEMA)
+            .field("foo", Schema.BOOLEAN_SCHEMA)
+            .field("etc", Schema.STRING_SCHEMA)
+            .build();
+
+        final SinkRecord record = new SinkRecord("test", 0, null, null, 
schema, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertNull(transformedRecord.value());
+        assertEquals(schema, transformedRecord.valueSchema());
+    }
+
+    @Test
     public void schemaless() {
         final Map<String, String> props = new HashMap<>();
         props.put("blacklist", "dont");

Reply via email to