This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch properties-sink in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 3a274e71b18c18e43f702a57db80032fc97e2fa9 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu May 21 08:41:27 2020 +0200 CamelSinkTask Properties need to be cleaned up before going ahead --- .../apache/camel/kafkaconnector/CamelSinkTask.java | 23 ++++---- .../camel/kafkaconnector/CamelSinkTaskTest.java | 66 +++++++++++----------- 2 files changed, 45 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 15ccd2b..d8fef18 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -171,29 +171,30 @@ public class CamelSinkTask extends SinkTask { } private void addProperty(Exchange exchange, Header singleHeader) { + String camelPropertyKey = StringUtils.removeStart(singleHeader.key(), PROPERTY_CAMEL_PREFIX); Schema schema = singleHeader.schema(); if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) { - exchange.getProperties().put(singleHeader.key(), (String)singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (String)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) { - exchange.getProperties().put(singleHeader.key(), (Boolean)singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (Boolean)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName())) { - exchange.getProperties().put(singleHeader.key(), singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName())) { - exchange.getProperties().put(singleHeader.key(), (byte[])singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (byte[])singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName())) { - exchange.getProperties().put(singleHeader.key(), (float)singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (float)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName())) { - exchange.getProperties().put(singleHeader.key(), (double)singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (double)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName())) { - exchange.getProperties().put(singleHeader.key(), (short)singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (short)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName())) { - exchange.getProperties().put(singleHeader.key(), (long)singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (long)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) { - exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (byte)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) { - exchange.getProperties().put(singleHeader.key(), (Map<?, ?>)singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (Map<?, ?>)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) { - exchange.getProperties().put(singleHeader.key(), (List<?>)singleHeader.value()); + exchange.getProperties().put(camelPropertyKey, (List<?>)singleHeader.value()); } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 4941741..a895d3d 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -138,13 +138,13 @@ public class CamelSinkTaskTest { Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean")); - assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte")); - assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat")); - assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort")); - assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble")); - assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger")); - assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong")); + assertTrue((boolean) exchange.getProperties().get("MyBoolean")); + assertEquals(myByte, (Byte) exchange.getProperties().get("MyByte")); + assertEquals(myFloat, (Float) exchange.getProperties().get("MyFloat")); + assertEquals(myShort, (Short) exchange.getProperties().get("MyShort")); + assertEquals(myDouble, (Double) exchange.getProperties().get("MyDouble")); + assertEquals(myInteger, exchange.getProperties().get("MyInteger")); + assertEquals(myLong, (Long) exchange.getProperties().get("MyLong")); sinkTask.stop(); } @@ -188,13 +188,13 @@ public class CamelSinkTaskTest { Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean")); - assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte")); - assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat")); - assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort")); - assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble")); - assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger")); - assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong")); + assertTrue((boolean) exchange.getProperties().get("MyBoolean")); + assertEquals(myByte, (Byte) exchange.getProperties().get("MyByte")); + assertEquals(myFloat, (Float) exchange.getProperties().get("MyFloat")); + assertEquals(myShort, (Short) exchange.getProperties().get("MyShort")); + assertEquals(myDouble, (Double) exchange.getProperties().get("MyDouble")); + assertEquals(myInteger, exchange.getProperties().get("MyInteger")); + assertEquals(myLong, (Long) exchange.getProperties().get("MyLong")); assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); @@ -311,16 +311,16 @@ public class CamelSinkTaskTest { Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean")); - assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte")); - assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat")); - assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort")); - assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble")); - assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger")); - assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong")); - assertEquals(map, exchange.getProperties().get("CamelPropertyMyMap")); - assertEquals(map1, exchange.getProperties().get("CamelPropertyMyMap1")); - assertEquals(map2, exchange.getProperties().get("CamelPropertyMyMap2")); + assertTrue((boolean) exchange.getProperties().get("MyBoolean")); + assertEquals(myByte, (Byte) exchange.getProperties().get("MyByte")); + assertEquals(myFloat, (Float) exchange.getProperties().get("MyFloat")); + assertEquals(myShort, (Short) exchange.getProperties().get("MyShort")); + assertEquals(myDouble, (Double) exchange.getProperties().get("MyDouble")); + assertEquals(myInteger, exchange.getProperties().get("MyInteger")); + assertEquals(myLong, (Long) exchange.getProperties().get("MyLong")); + assertEquals(map, exchange.getProperties().get("MyMap")); + assertEquals(map1, exchange.getProperties().get("MyMap1")); + assertEquals(map2, exchange.getProperties().get("MyMap2")); assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); @@ -432,15 +432,15 @@ public class CamelSinkTaskTest { Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean")); - assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte")); - assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat")); - assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort")); - assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble")); - assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger")); - assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong")); - assertEquals(list, exchange.getProperties().get("CamelPropertyMyList")); - assertEquals(list1, exchange.getProperties().get("CamelPropertyMyList1")); + assertTrue((boolean) exchange.getProperties().get("MyBoolean")); + assertEquals(myByte, (Byte) exchange.getProperties().get("MyByte")); + assertEquals(myFloat, (Float) exchange.getProperties().get("MyFloat")); + assertEquals(myShort, (Short) exchange.getProperties().get("MyShort")); + assertEquals(myDouble, (Double) exchange.getProperties().get("MyDouble")); + assertEquals(myInteger, exchange.getProperties().get("MyInteger")); + assertEquals(myLong, (Long) exchange.getProperties().get("MyLong")); + assertEquals(list, exchange.getProperties().get("MyList")); + assertEquals(list1, exchange.getProperties().get("MyList1")); assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class));