This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch properties-sink
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3a274e71b18c18e43f702a57db80032fc97e2fa9
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Thu May 21 08:41:27 2020 +0200

    CamelSinkTask Properties need to be cleaned up before going ahead
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java | 23 ++++----
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 66 +++++++++++-----------
 2 files changed, 45 insertions(+), 44 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 15ccd2b..d8fef18 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -171,29 +171,30 @@ public class CamelSinkTask extends SinkTask {
     }
 
     private void addProperty(Exchange exchange, Header singleHeader) {
+        String camelPropertyKey = StringUtils.removeStart(singleHeader.key(), 
PROPERTY_CAMEL_PREFIX);
         Schema schema = singleHeader.schema();
         if 
(schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
-            exchange.getProperties().put(singleHeader.key(), 
(String)singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
(String)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName()))
 {
-            exchange.getProperties().put(singleHeader.key(), 
(Boolean)singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
(Boolean)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName()))
 {
-            exchange.getProperties().put(singleHeader.key(), 
singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName()))
 {
-            exchange.getProperties().put(singleHeader.key(), 
(byte[])singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
(byte[])singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName()))
 {
-            exchange.getProperties().put(singleHeader.key(), 
(float)singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
(float)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName()))
 {
-            exchange.getProperties().put(singleHeader.key(), 
(double)singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
(double)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName()))
 {
-            exchange.getProperties().put(singleHeader.key(), 
(short)singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
(short)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName()))
 {
-            exchange.getProperties().put(singleHeader.key(), 
(long)singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
(long)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) 
{
-            exchange.getProperties().put(singleHeader.key(), 
(byte)singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
(byte)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA,
 Schema.STRING_SCHEMA).type().getName())) {
-            exchange.getProperties().put(singleHeader.key(), (Map<?, 
?>)singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, (Map<?, 
?>)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName()))
 {
-            exchange.getProperties().put(singleHeader.key(), 
(List<?>)singleHeader.value());
+            exchange.getProperties().put(camelPropertyKey, 
(List<?>)singleHeader.value());
         }
     }
 
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 4941741..a895d3d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -138,13 +138,13 @@ public class CamelSinkTaskTest {
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-        assertTrue((boolean) 
exchange.getProperties().get("CamelPropertyMyBoolean"));
-        assertEquals(myByte, (Byte) 
exchange.getProperties().get("CamelPropertyMyByte"));
-        assertEquals(myFloat, (Float) 
exchange.getProperties().get("CamelPropertyMyFloat"));
-        assertEquals(myShort, (Short) 
exchange.getProperties().get("CamelPropertyMyShort"));
-        assertEquals(myDouble, (Double) 
exchange.getProperties().get("CamelPropertyMyDouble"));
-        assertEquals(myInteger, 
exchange.getProperties().get("CamelPropertyMyInteger"));
-        assertEquals(myLong, (Long) 
exchange.getProperties().get("CamelPropertyMyLong"));
+        assertTrue((boolean) exchange.getProperties().get("MyBoolean"));
+        assertEquals(myByte, (Byte) exchange.getProperties().get("MyByte"));
+        assertEquals(myFloat, (Float) exchange.getProperties().get("MyFloat"));
+        assertEquals(myShort, (Short) exchange.getProperties().get("MyShort"));
+        assertEquals(myDouble, (Double) 
exchange.getProperties().get("MyDouble"));
+        assertEquals(myInteger, exchange.getProperties().get("MyInteger"));
+        assertEquals(myLong, (Long) exchange.getProperties().get("MyLong"));
 
         sinkTask.stop();
     }
@@ -188,13 +188,13 @@ public class CamelSinkTaskTest {
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-        assertTrue((boolean) 
exchange.getProperties().get("CamelPropertyMyBoolean"));
-        assertEquals(myByte, (Byte) 
exchange.getProperties().get("CamelPropertyMyByte"));
-        assertEquals(myFloat, (Float) 
exchange.getProperties().get("CamelPropertyMyFloat"));
-        assertEquals(myShort, (Short) 
exchange.getProperties().get("CamelPropertyMyShort"));
-        assertEquals(myDouble, (Double) 
exchange.getProperties().get("CamelPropertyMyDouble"));
-        assertEquals(myInteger, 
exchange.getProperties().get("CamelPropertyMyInteger"));
-        assertEquals(myLong, (Long) 
exchange.getProperties().get("CamelPropertyMyLong"));
+        assertTrue((boolean) exchange.getProperties().get("MyBoolean"));
+        assertEquals(myByte, (Byte) exchange.getProperties().get("MyByte"));
+        assertEquals(myFloat, (Float) exchange.getProperties().get("MyFloat"));
+        assertEquals(myShort, (Short) exchange.getProperties().get("MyShort"));
+        assertEquals(myDouble, (Double) 
exchange.getProperties().get("MyDouble"));
+        assertEquals(myInteger, exchange.getProperties().get("MyInteger"));
+        assertEquals(myLong, (Long) exchange.getProperties().get("MyLong"));
         assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
         assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
         assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));
@@ -311,16 +311,16 @@ public class CamelSinkTaskTest {
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-        assertTrue((boolean) 
exchange.getProperties().get("CamelPropertyMyBoolean"));
-        assertEquals(myByte, (Byte) 
exchange.getProperties().get("CamelPropertyMyByte"));
-        assertEquals(myFloat, (Float) 
exchange.getProperties().get("CamelPropertyMyFloat"));
-        assertEquals(myShort, (Short) 
exchange.getProperties().get("CamelPropertyMyShort"));
-        assertEquals(myDouble, (Double) 
exchange.getProperties().get("CamelPropertyMyDouble"));
-        assertEquals(myInteger, 
exchange.getProperties().get("CamelPropertyMyInteger"));
-        assertEquals(myLong, (Long) 
exchange.getProperties().get("CamelPropertyMyLong"));
-        assertEquals(map, exchange.getProperties().get("CamelPropertyMyMap"));
-        assertEquals(map1, 
exchange.getProperties().get("CamelPropertyMyMap1"));
-        assertEquals(map2, 
exchange.getProperties().get("CamelPropertyMyMap2"));
+        assertTrue((boolean) exchange.getProperties().get("MyBoolean"));
+        assertEquals(myByte, (Byte) exchange.getProperties().get("MyByte"));
+        assertEquals(myFloat, (Float) exchange.getProperties().get("MyFloat"));
+        assertEquals(myShort, (Short) exchange.getProperties().get("MyShort"));
+        assertEquals(myDouble, (Double) 
exchange.getProperties().get("MyDouble"));
+        assertEquals(myInteger, exchange.getProperties().get("MyInteger"));
+        assertEquals(myLong, (Long) exchange.getProperties().get("MyLong"));
+        assertEquals(map, exchange.getProperties().get("MyMap"));
+        assertEquals(map1, exchange.getProperties().get("MyMap1"));
+        assertEquals(map2, exchange.getProperties().get("MyMap2"));
         assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
         assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
         assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));
@@ -432,15 +432,15 @@ public class CamelSinkTaskTest {
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-        assertTrue((boolean) 
exchange.getProperties().get("CamelPropertyMyBoolean"));
-        assertEquals(myByte, (Byte) 
exchange.getProperties().get("CamelPropertyMyByte"));
-        assertEquals(myFloat, (Float) 
exchange.getProperties().get("CamelPropertyMyFloat"));
-        assertEquals(myShort, (Short) 
exchange.getProperties().get("CamelPropertyMyShort"));
-        assertEquals(myDouble, (Double) 
exchange.getProperties().get("CamelPropertyMyDouble"));
-        assertEquals(myInteger, 
exchange.getProperties().get("CamelPropertyMyInteger"));
-        assertEquals(myLong, (Long) 
exchange.getProperties().get("CamelPropertyMyLong"));
-        assertEquals(list, 
exchange.getProperties().get("CamelPropertyMyList"));
-        assertEquals(list1, 
exchange.getProperties().get("CamelPropertyMyList1"));
+        assertTrue((boolean) exchange.getProperties().get("MyBoolean"));
+        assertEquals(myByte, (Byte) exchange.getProperties().get("MyByte"));
+        assertEquals(myFloat, (Float) exchange.getProperties().get("MyFloat"));
+        assertEquals(myShort, (Short) exchange.getProperties().get("MyShort"));
+        assertEquals(myDouble, (Double) 
exchange.getProperties().get("MyDouble"));
+        assertEquals(myInteger, exchange.getProperties().get("MyInteger"));
+        assertEquals(myLong, (Long) exchange.getProperties().get("MyLong"));
+        assertEquals(list, exchange.getProperties().get("MyList"));
+        assertEquals(list1, exchange.getProperties().get("MyList1"));
         assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
         assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class));
         assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", 
Float.class));

Reply via email to