This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1de9ad6  Sink test refactoring
     new 9bcd681  Merge pull request #216 from fvaleri/sink-test-ref
1de9ad6 is described below

commit 1de9ad69644b5c46ebee471bf02e23f2930df8e7
Author: Federico Valeri <fvaleri@localhost>
AuthorDate: Sat May 16 19:29:22 2020 +0200

    Sink test refactoring
---
 .../kafkaconnector/CamelSinkConnectorConfig.java   |   4 +
 .../kafkaconnector/CamelSourceConnectorConfig.java |   2 +-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 232 ++++++++++-----------
 3 files changed, 116 insertions(+), 122 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index e11e84d..c353c58 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -38,6 +38,10 @@ public class CamelSinkConnectorConfig extends AbstractConfig 
{
     public static final String CAMEL_SINK_URL_DOC = "The camel url to 
configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF
             + " and all the properties starting with " + 
CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" + 
CAMEL_SINK_COMPONENT_CONF + " value> are ignored.";
 
+    public static final String TOPIC_DEFAULT = "test";
+    public static final String TOPIC_CONF = "topics";
+    public static final String TOPIC_DOC = "A list of topics to use as input 
for this connector";
+
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, 
Importance.HIGH, CAMEL_SINK_URL_DOC)
         .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, 
CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 01a55b8..34e2495 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -40,7 +40,7 @@ public class CamelSourceConnectorConfig extends 
AbstractConfig {
 
     public static final String TOPIC_DEFAULT = "test";
     public static final String TOPIC_CONF = "topics";
-    public static final String TOPIC_DOC = "The topic to publish data to";
+    public static final String TOPIC_DOC = "A list of topics to use as output 
for this connector";
 
     public static final Long CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT = 1000L;
     public static final String CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF = 
"camel.source.maxBatchPollSize";
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index ba147b8..24cd03c 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -34,40 +34,41 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CamelSinkTaskTest {
 
+    private static final String SEDA_URI = "seda:test";
+    private static final String TOPIC_NAME = "my-topic";
+    private static final long RECEIVE_TIMEOUT = 1_000;
+
     @Test
     public void testOnlyBody() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.sink.url", "seda:test");
-        props.put("topics", "mytopic");
-
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        String topic = "mytopic";
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
 
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
 
     @Test
     public void testBodyAndHeaders() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.sink.url", "seda:test");
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
-        String topic = "mytopic";
         Byte myByte = new Byte("100");
         Float myFloat = new Float("100");
         Short myShort = new Short("100");
@@ -76,7 +77,7 @@ public class CamelSinkTaskTest {
         Long myLong = new Long("100");
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         record.headers().addBoolean("CamelHeaderMyBoolean", true);
         record.headers().addByte("CamelHeaderMyByte", myByte);
         record.headers().addFloat("CamelHeaderMyFloat", myFloat);
@@ -85,10 +86,10 @@ public class CamelSinkTaskTest {
         record.headers().addInt("CamelHeaderMyInteger", myInteger);
         record.headers().addLong("CamelHeaderMyLong", myLong);
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
@@ -99,19 +100,18 @@ public class CamelSinkTaskTest {
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
 
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndProperties() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.sink.url", "seda:test");
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
-        String topic = "mytopic";
         Byte myByte = new Byte("100");
         Float myFloat = new Float("100");
         Short myShort = new Short("100");
@@ -120,7 +120,7 @@ public class CamelSinkTaskTest {
         Long myLong = new Long("100");
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         record.headers().addBoolean("CamelPropertyMyBoolean", true);
         record.headers().addByte("CamelPropertyMyByte", myByte);
         record.headers().addFloat("CamelPropertyMyFloat", myFloat);
@@ -129,10 +129,10 @@ public class CamelSinkTaskTest {
         record.headers().addInt("CamelPropertyMyInteger", myInteger);
         record.headers().addLong("CamelPropertyMyLong", myLong);
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertTrue((boolean) 
exchange.getProperties().get("CamelPropertyMyBoolean"));
@@ -143,19 +143,18 @@ public class CamelSinkTaskTest {
         assertEquals(myInteger, 
exchange.getProperties().get("CamelPropertyMyInteger"));
         assertEquals(myLong, (Long) 
exchange.getProperties().get("CamelPropertyMyLong"));
 
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndPropertiesHeadersMixed() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.sink.url", "seda:test");
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
-        String topic = "mytopic";
         Byte myByte = new Byte("100");
         Float myFloat = new Float("100");
         Short myShort = new Short("100");
@@ -164,7 +163,7 @@ public class CamelSinkTaskTest {
         Long myLong = new Long("100");
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         record.headers().addBoolean("CamelPropertyMyBoolean", true);
         record.headers().addByte("CamelPropertyMyByte", myByte);
         record.headers().addFloat("CamelPropertyMyFloat", myFloat);
@@ -180,10 +179,10 @@ public class CamelSinkTaskTest {
         record.headers().addInt("CamelHeaderMyInteger", myInteger);
         record.headers().addLong("CamelHeaderMyLong", myLong);
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertTrue((boolean) 
exchange.getProperties().get("CamelPropertyMyBoolean"));
@@ -201,19 +200,18 @@ public class CamelSinkTaskTest {
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
 
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndHeadersMap() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.sink.url", "seda:test");
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
-        String topic = "mytopic";
         Byte myByte = new Byte("100");
         Float myFloat = new Float("100");
         Short myShort = new Short("100");
@@ -228,7 +226,7 @@ public class CamelSinkTaskTest {
         map2.put(1, 1);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         record.headers().addBoolean("CamelHeaderMyBoolean", true);
         record.headers().addByte("CamelHeaderMyByte", myByte);
         record.headers().addFloat("CamelHeaderMyFloat", myFloat);
@@ -240,10 +238,10 @@ public class CamelSinkTaskTest {
         record.headers().addMap("CamelHeaderMyMap1", map1, 
SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
         record.headers().addMap("CamelHeaderMyMap2", map2, 
SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
@@ -256,19 +254,18 @@ public class CamelSinkTaskTest {
         assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
         assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
         assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndPropertiesHeadersMapMixed() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.sink.url", "seda:test");
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
-        String topic = "mytopic";
         Byte myByte = new Byte("100");
         Float myFloat = new Float("100");
         Short myShort = new Short("100");
@@ -283,7 +280,7 @@ public class CamelSinkTaskTest {
         map2.put(1, 1);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         record.headers().addBoolean("CamelPropertyMyBoolean", true);
         record.headers().addByte("CamelPropertyMyByte", myByte);
         record.headers().addFloat("CamelPropertyMyFloat", myFloat);
@@ -305,10 +302,10 @@ public class CamelSinkTaskTest {
         record.headers().addMap("CamelHeaderMyMap1", map1, 
SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
         record.headers().addMap("CamelHeaderMyMap2", map2, 
SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertTrue((boolean) 
exchange.getProperties().get("CamelPropertyMyBoolean"));
@@ -332,19 +329,18 @@ public class CamelSinkTaskTest {
         assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
         assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
 
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndHeadersList() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.sink.url", "seda:test");
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
-        String topic = "mytopic";
         Byte myByte = new Byte("100");
         Float myFloat = new Float("100");
         Short myShort = new Short("100");
@@ -357,7 +353,7 @@ public class CamelSinkTaskTest {
         list1.add(1);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         record.headers().addBoolean("CamelHeaderMyBoolean", true);
         record.headers().addByte("CamelHeaderMyByte", myByte);
         record.headers().addFloat("CamelHeaderMyFloat", myFloat);
@@ -368,10 +364,10 @@ public class CamelSinkTaskTest {
         record.headers().addList("CamelHeaderMyList", list, 
SchemaBuilder.array(Schema.STRING_SCHEMA));
         record.headers().addList("CamelHeaderMyList1", list1, 
SchemaBuilder.array(Schema.INT64_SCHEMA));
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
@@ -383,19 +379,18 @@ public class CamelSinkTaskTest {
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
         assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndPropertiesHeadersListMixed() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.sink.url", "seda:test");
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
-        String topic = "mytopic";
         Byte myByte = new Byte("100");
         Float myFloat = new Float("100");
         Short myShort = new Short("100");
@@ -408,7 +403,7 @@ public class CamelSinkTaskTest {
         list1.add(1);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         record.headers().addBoolean("CamelPropertyMyBoolean", true);
         record.headers().addByte("CamelPropertyMyByte", myByte);
         record.headers().addFloat("CamelPropertyMyFloat", myFloat);
@@ -428,10 +423,10 @@ public class CamelSinkTaskTest {
         record.headers().addList("CamelPropertyMyList", list, 
SchemaBuilder.array(Schema.STRING_SCHEMA));
         record.headers().addList("CamelPropertyMyList1", list1, 
SchemaBuilder.array(Schema.INT64_SCHEMA));
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertTrue((boolean) 
exchange.getProperties().get("CamelPropertyMyBoolean"));
@@ -453,91 +448,86 @@ public class CamelSinkTaskTest {
         assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
         assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
 
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
 
     @Test
     public void testUrlPrecedenceOnComponentProperty() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.sink.url", "seda:test");
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, 
"shouldNotBeUsed");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + 
"endpointProperty", "shouldNotBeUsed");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", 
"shouldNotBeUsed");
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
-
-        String topic = "mytopic";
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
 
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
 
     @Test
     public void testOnlyBodyUsingComponentProperty() {
         Map<String, String> props = new HashMap<>();
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + 
"bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", 
"test");
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
-
-        String topic = "mytopic";
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(1, sinkTask.getCms().getEndpoints()
+            .stream().filter(e -> 
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
 
-        assertEquals(1, 
camelSinkTask.getCms().getEndpoints().stream().filter(e -> 
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
-
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
 
     @Test
     public void testOnlyBodyUsingMultipleComponentProperties() {
         Map<String, String> props = new HashMap<>();
-        props.put("topics", "mytopic");
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + 
"bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", 
"50");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", 
"test");
 
-        CamelSinkTask camelSinkTask = new CamelSinkTask();
-        camelSinkTask.start(props);
-
-        String topic = "mytopic";
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
 
         List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord(topic, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
         records.add(record);
-        camelSinkTask.put(records);
+        sinkTask.put(records);
 
-        ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
-        Exchange exchange = c.receive("seda:test", 1000L);
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
 
-        assertEquals(1, 
camelSinkTask.getCms().getEndpoints().stream().filter(e -> 
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
+        assertEquals(1, sinkTask.getCms().getEndpoints()
+            .stream().filter(e -> 
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
 
-        camelSinkTask.stop();
+        sinkTask.stop();
     }
 
 }

Reply via email to