This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 1de9ad6 Sink test refactoring new 9bcd681 Merge pull request #216 from fvaleri/sink-test-ref 1de9ad6 is described below commit 1de9ad69644b5c46ebee471bf02e23f2930df8e7 Author: Federico Valeri <fvaleri@localhost> AuthorDate: Sat May 16 19:29:22 2020 +0200 Sink test refactoring --- .../kafkaconnector/CamelSinkConnectorConfig.java | 4 + .../kafkaconnector/CamelSourceConnectorConfig.java | 2 +- .../camel/kafkaconnector/CamelSinkTaskTest.java | 232 ++++++++++----------- 3 files changed, 116 insertions(+), 122 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index e11e84d..c353c58 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -38,6 +38,10 @@ public class CamelSinkConnectorConfig extends AbstractConfig { public static final String CAMEL_SINK_URL_DOC = "The camel url to configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF + " and all the properties starting with " + CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" + CAMEL_SINK_COMPONENT_CONF + " value> are ignored."; + public static final String TOPIC_DEFAULT = "test"; + public static final String TOPIC_CONF = "topics"; + public static final String TOPIC_DOC = "A list of topics to use as input for this connector"; + private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC) .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index 01a55b8..34e2495 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -40,7 +40,7 @@ public class CamelSourceConnectorConfig extends AbstractConfig { public static final String TOPIC_DEFAULT = "test"; public static final String TOPIC_CONF = "topics"; - public static final String TOPIC_DOC = "The topic to publish data to"; + public static final String TOPIC_DOC = "A list of topics to use as output for this connector"; public static final Long CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT = 1000L; public static final String CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF = "camel.source.maxBatchPollSize"; diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index ba147b8..24cd03c 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -34,40 +34,41 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class CamelSinkTaskTest { + private static final String SEDA_URI = "seda:test"; + private static final String TOPIC_NAME = "my-topic"; + private static final long RECEIVE_TIMEOUT = 1_000; + @Test public void testOnlyBody() { Map<String, String> props = new HashMap<>(); - props.put("camel.sink.url", "seda:test"); - props.put("topics", "mytopic"); - - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - String topic = "mytopic"; + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - camelSinkTask.stop(); + sinkTask.stop(); } @Test public void testBodyAndHeaders() { Map<String, String> props = new HashMap<>(); - props.put("camel.sink.url", "seda:test"); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); - String topic = "mytopic"; Byte myByte = new Byte("100"); Float myFloat = new Float("100"); Short myShort = new Short("100"); @@ -76,7 +77,7 @@ public class CamelSinkTaskTest { Long myLong = new Long("100"); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); record.headers().addBoolean("CamelHeaderMyBoolean", true); record.headers().addByte("CamelHeaderMyByte", myByte); record.headers().addFloat("CamelHeaderMyFloat", myFloat); @@ -85,10 +86,10 @@ public class CamelSinkTaskTest { record.headers().addInt("CamelHeaderMyInteger", myInteger); record.headers().addLong("CamelHeaderMyLong", myLong); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); @@ -99,19 +100,18 @@ public class CamelSinkTaskTest { assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); - camelSinkTask.stop(); + sinkTask.stop(); } - + @Test public void testBodyAndProperties() { Map<String, String> props = new HashMap<>(); - props.put("camel.sink.url", "seda:test"); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); - String topic = "mytopic"; Byte myByte = new Byte("100"); Float myFloat = new Float("100"); Short myShort = new Short("100"); @@ -120,7 +120,7 @@ public class CamelSinkTaskTest { Long myLong = new Long("100"); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); record.headers().addBoolean("CamelPropertyMyBoolean", true); record.headers().addByte("CamelPropertyMyByte", myByte); record.headers().addFloat("CamelPropertyMyFloat", myFloat); @@ -129,10 +129,10 @@ public class CamelSinkTaskTest { record.headers().addInt("CamelPropertyMyInteger", myInteger); record.headers().addLong("CamelPropertyMyLong", myLong); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean")); @@ -143,19 +143,18 @@ public class CamelSinkTaskTest { assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger")); assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong")); - camelSinkTask.stop(); + sinkTask.stop(); } - + @Test public void testBodyAndPropertiesHeadersMixed() { Map<String, String> props = new HashMap<>(); - props.put("camel.sink.url", "seda:test"); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); - String topic = "mytopic"; Byte myByte = new Byte("100"); Float myFloat = new Float("100"); Short myShort = new Short("100"); @@ -164,7 +163,7 @@ public class CamelSinkTaskTest { Long myLong = new Long("100"); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); record.headers().addBoolean("CamelPropertyMyBoolean", true); record.headers().addByte("CamelPropertyMyByte", myByte); record.headers().addFloat("CamelPropertyMyFloat", myFloat); @@ -180,10 +179,10 @@ public class CamelSinkTaskTest { record.headers().addInt("CamelHeaderMyInteger", myInteger); record.headers().addLong("CamelHeaderMyLong", myLong); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean")); @@ -201,19 +200,18 @@ public class CamelSinkTaskTest { assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); - camelSinkTask.stop(); + sinkTask.stop(); } - + @Test public void testBodyAndHeadersMap() { Map<String, String> props = new HashMap<>(); - props.put("camel.sink.url", "seda:test"); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); - String topic = "mytopic"; Byte myByte = new Byte("100"); Float myFloat = new Float("100"); Short myShort = new Short("100"); @@ -228,7 +226,7 @@ public class CamelSinkTaskTest { map2.put(1, 1); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); record.headers().addBoolean("CamelHeaderMyBoolean", true); record.headers().addByte("CamelHeaderMyByte", myByte); record.headers().addFloat("CamelHeaderMyFloat", myFloat); @@ -240,10 +238,10 @@ public class CamelSinkTaskTest { record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA)); record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA)); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); @@ -256,19 +254,18 @@ public class CamelSinkTaskTest { assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class)); assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class)); assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class)); - camelSinkTask.stop(); + sinkTask.stop(); } - + @Test public void testBodyAndPropertiesHeadersMapMixed() { Map<String, String> props = new HashMap<>(); - props.put("camel.sink.url", "seda:test"); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); - String topic = "mytopic"; Byte myByte = new Byte("100"); Float myFloat = new Float("100"); Short myShort = new Short("100"); @@ -283,7 +280,7 @@ public class CamelSinkTaskTest { map2.put(1, 1); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); record.headers().addBoolean("CamelPropertyMyBoolean", true); record.headers().addByte("CamelPropertyMyByte", myByte); record.headers().addFloat("CamelPropertyMyFloat", myFloat); @@ -305,10 +302,10 @@ public class CamelSinkTaskTest { record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA)); record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA)); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean")); @@ -332,19 +329,18 @@ public class CamelSinkTaskTest { assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class)); assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class)); - camelSinkTask.stop(); + sinkTask.stop(); } - + @Test public void testBodyAndHeadersList() { Map<String, String> props = new HashMap<>(); - props.put("camel.sink.url", "seda:test"); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); - String topic = "mytopic"; Byte myByte = new Byte("100"); Float myFloat = new Float("100"); Short myShort = new Short("100"); @@ -357,7 +353,7 @@ public class CamelSinkTaskTest { list1.add(1); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); record.headers().addBoolean("CamelHeaderMyBoolean", true); record.headers().addByte("CamelHeaderMyByte", myByte); record.headers().addFloat("CamelHeaderMyFloat", myFloat); @@ -368,10 +364,10 @@ public class CamelSinkTaskTest { record.headers().addList("CamelHeaderMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA)); record.headers().addList("CamelHeaderMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA)); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); @@ -383,19 +379,18 @@ public class CamelSinkTaskTest { assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); assertEquals(list, exchange.getIn().getHeader("MyList", List.class)); assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class)); - camelSinkTask.stop(); + sinkTask.stop(); } - + @Test public void testBodyAndPropertiesHeadersListMixed() { Map<String, String> props = new HashMap<>(); - props.put("camel.sink.url", "seda:test"); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); - String topic = "mytopic"; Byte myByte = new Byte("100"); Float myFloat = new Float("100"); Short myShort = new Short("100"); @@ -408,7 +403,7 @@ public class CamelSinkTaskTest { list1.add(1); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); record.headers().addBoolean("CamelPropertyMyBoolean", true); record.headers().addByte("CamelPropertyMyByte", myByte); record.headers().addFloat("CamelPropertyMyFloat", myFloat); @@ -428,10 +423,10 @@ public class CamelSinkTaskTest { record.headers().addList("CamelPropertyMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA)); record.headers().addList("CamelPropertyMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA)); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean")); @@ -453,91 +448,86 @@ public class CamelSinkTaskTest { assertEquals(list, exchange.getIn().getHeader("MyList", List.class)); assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class)); - camelSinkTask.stop(); + sinkTask.stop(); } @Test public void testUrlPrecedenceOnComponentProperty() { Map<String, String> props = new HashMap<>(); - props.put("camel.sink.url", "seda:test"); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "shouldNotBeUsed"); props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed"); props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "shouldNotBeUsed"); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); - - String topic = "mytopic"; + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - camelSinkTask.stop(); + sinkTask.stop(); } @Test public void testOnlyBodyUsingComponentProperty() { Map<String, String> props = new HashMap<>(); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda"); props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true"); props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test"); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); - - String topic = "mytopic"; + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(1, sinkTask.getCms().getEndpoints() + .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count()); - assertEquals(1, camelSinkTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count()); - - camelSinkTask.stop(); + sinkTask.stop(); } @Test public void testOnlyBodyUsingMultipleComponentProperties() { Map<String, String> props = new HashMap<>(); - props.put("topics", "mytopic"); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda"); props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true"); props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50"); props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test"); - CamelSinkTask camelSinkTask = new CamelSinkTask(); - camelSinkTask.start(props); - - String topic = "mytopic"; + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); records.add(record); - camelSinkTask.put(records); + sinkTask.put(records); - ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate(); - Exchange exchange = c.receive("seda:test", 1000L); + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertEquals(1, camelSinkTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count()); + assertEquals(1, sinkTask.getCms().getEndpoints() + .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count()); - camelSinkTask.stop(); + sinkTask.stop(); } }