This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 266c367 allow ReplaceField SMT to handle tombstone records (#7731) 266c367 is described below commit 266c367e6c83faabb9a8df956e5f97d5812a4426 Author: Lev Zemlyanov <lzeml...@purdue.edu> AuthorDate: Wed Feb 12 14:36:06 2020 -0800 allow ReplaceField SMT to handle tombstone records (#7731) Signed-off-by: Lev Zemlyanov <l...@confluent.io> --- .../kafka/connect/transforms/ReplaceField.java | 4 ++- .../kafka/connect/transforms/ReplaceFieldTest.java | 38 ++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java index f071bda..3d9abc2 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java @@ -123,7 +123,9 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf @Override public R apply(R record) { - if (operatingSchema(record) == null) { + if (operatingValue(record) == null) { + return record; + } else if (operatingSchema(record) == null) { return applySchemaless(record); } else { return applyWithSchema(record); diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java index 6a1a13a..7ab00ed 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class ReplaceFieldTest { private ReplaceField<SinkRecord> xform = new ReplaceField.Value<>(); @@ -37,6 +38,43 @@ public class ReplaceFieldTest { } @Test + public void tombstoneSchemaless() { + final Map<String, String> props = new HashMap<>(); + props.put("whitelist", "abc,foo"); + props.put("renames", "abc:xyz,foo:bar"); + + xform.configure(props); + + final SinkRecord record = new SinkRecord("test", 0, null, null, null, null, 0); + final SinkRecord transformedRecord = xform.apply(record); + + assertNull(transformedRecord.value()); + assertNull(transformedRecord.valueSchema()); + } + + @Test + public void tombstoneWithSchema() { + final Map<String, String> props = new HashMap<>(); + props.put("whitelist", "abc,foo"); + props.put("renames", "abc:xyz,foo:bar"); + + xform.configure(props); + + final Schema schema = SchemaBuilder.struct() + .field("dont", Schema.STRING_SCHEMA) + .field("abc", Schema.INT32_SCHEMA) + .field("foo", Schema.BOOLEAN_SCHEMA) + .field("etc", Schema.STRING_SCHEMA) + .build(); + + final SinkRecord record = new SinkRecord("test", 0, null, null, schema, null, 0); + final SinkRecord transformedRecord = xform.apply(record); + + assertNull(transformedRecord.value()); + assertEquals(schema, transformedRecord.valueSchema()); + } + + @Test public void schemaless() { final Map<String, String> props = new HashMap<>(); props.put("blacklist", "dont");