This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 729cc55 Adds support for ElasticSearch and include integration tests new 4808e77 Merge pull request #36 from orpiske/elasticsearch 729cc55 is described below commit 729cc5538561ca1b4540b6de5a38eec1bbd6c779 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Dec 4 17:20:15 2019 +0100 Adds support for ElasticSearch and include integration tests --- core/pom.xml | 4 + parent/pom.xml | 14 +- tests/pom.xml | 7 + .../apache/camel/kafkaconnector/TestCommon.java | 10 ++ .../clients/elasticsearch/ElasticSearchClient.java | 173 +++++++++++++++++++++ .../CamelElasticSearchIndexPropertyFactory.java | 56 +++++++ .../CamelElasticSearchPropertyFactory.java | 75 +++++++++ .../CamelSinkElasticSearchITCase.java | 146 +++++++++++++++++ .../ConnectRecordValueToMapTransformer.java | 70 +++++++++ tests/src/test/resources/log4j2.properties | 4 + 10 files changed, 558 insertions(+), 1 deletion(-) diff --git a/core/pom.xml b/core/pom.xml index 1763235..ab2e4d8 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -90,6 +90,10 @@ <artifactId>camel-http</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-elasticsearch-rest</artifactId> + </dependency> <!-- Kafka --> <dependency> diff --git a/parent/pom.xml b/parent/pom.xml index b719242..ce526c8 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -149,6 +149,13 @@ <artifactId>camel-http</artifactId> <version>${camel.version}</version> </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-elasticsearch-rest</artifactId> + <version>${camel.version}</version> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-cbor</artifactId> @@ -241,6 +248,12 @@ <version>${version.testcontainers}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>elasticsearch</artifactId> + <version>${version.testcontainers}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.camel</groupId> @@ -254,7 +267,6 @@ <version>${camel.version}</version> <scope>test</scope> </dependency> - </dependencies> </dependencyManagement> <build> diff --git a/tests/pom.xml b/tests/pom.xml index 2255c52..b40be9f 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -136,6 +136,13 @@ <artifactId>localstack</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>elasticsearch</artifactId> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java b/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java index 706ceb3..05719e6 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/TestCommon.java @@ -53,6 +53,16 @@ public final class TestCommon { */ public static final String DEFAULT_KINESIS_STREAM = "ckc-kin-stream"; + /** + * The default ElasticSearch cluster name for usage during the tests + */ + public static final String DEFAULT_ELASTICSEARCH_CLUSTER = "docker-cluster"; + + /** + * The default ElasticSearch index for usage during the tests + */ + public static final String DEFAULT_ELASTICSEARCH_INDEX = "ckc-index"; + private static final Logger LOG = LoggerFactory.getLogger(TestCommon.class); private TestCommon() { diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java new file mode 100644 index 0000000..93ccbbb --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.clients.elasticsearch; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; +import java.util.function.Predicate; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticSearchClient { + private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchClient.class); + + private final RestHighLevelClient client; + private final String index; + + public ElasticSearchClient(int port, String index) { + client = new RestHighLevelClient( + RestClient.builder( + new HttpHost("localhost", port, "http"))); + + this.index = index; + } + + public boolean indexExists() { + try { + GetIndexRequest indexRequest = new GetIndexRequest(index); + + return client.indices().exists(indexRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + /* + It may return if failed to parse the response, on timeout or no response from the ES instance. + Assuming it is more likely to timeout or provide no reply either the during the start up or + on overloaded CI environments, we log the I/O error and try again + */ + LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e); + } + + return false; + } + + public SearchHits getData() { + try { + SearchRequest searchRequest = new SearchRequest(index); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); + + searchRequest.source(searchSourceBuilder); + + SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); + + return response.getHits(); + + } catch (IOException e) { + /* + It may return if failed to parse the response, on timeout or no response from the ES instance. + Assuming it is more likely to timeout or provide no reply either the during the start up or + on overloaded CI environments, we log the I/O error and try again + */ + LOG.error("I/O error trying to query for index existence: {}", e.getMessage(), e); + } catch (Throwable e) { + LOG.error("Unhandled error trying to query for index existence: {}", e.getMessage(), e); + } + + return null; + } + + private boolean hasData(int expect) { + SearchHits searchHits = getData(); + + if (searchHits == null) { + LOG.debug("There are not search hit to return"); + + return false; + } + + SearchHit[] hits = searchHits.getHits(); + if (hits == null) { + LOG.debug("Empty data set"); + + return false; + } + + int count = hits.length; + + if (count != expect) { + LOG.debug("Not enough records: {} available, but {} expected", count, expect); + + return false; + } + + return true; + } + + private <T> void waitFor(Predicate<T> resourceCheck, T payload) { + boolean state; + int retries = 30; + int waitTime = 1000; + do { + try { + state = resourceCheck.test(payload); + + if (!state) { + LOG.debug("The resource is not yet available. Waiting {} seconds before retrying", + TimeUnit.MILLISECONDS.toSeconds(waitTime)); + retries--; + Thread.sleep(waitTime); + } + } catch (InterruptedException e) { + break; + } + + } while (!state && retries > 0); + } + + private void waitFor(BooleanSupplier resourceCheck) { + boolean state; + int retries = 30; + int waitTime = 1000; + do { + try { + state = resourceCheck.getAsBoolean(); + + if (!state) { + LOG.debug("The resource is not yet available. Waiting {} seconds before retrying", + TimeUnit.MILLISECONDS.toSeconds(waitTime)); + retries--; + Thread.sleep(waitTime); + } + } catch (InterruptedException e) { + break; + } + + } while (!state && retries > 0); + } + + public void waitForIndex() { + waitFor(this::indexExists); + } + + public void waitForData(int expect) { + waitFor(this::hasData, expect); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchIndexPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchIndexPropertyFactory.java new file mode 100644 index 0000000..500a0bc --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchIndexPropertyFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sink.elasticsearch; + +import java.util.Properties; + +import org.apache.kafka.connect.runtime.ConnectorConfig; + +public class CamelElasticSearchIndexPropertyFactory extends CamelElasticSearchPropertyFactory { + private final String index; + private final String transformerKey; + + + CamelElasticSearchIndexPropertyFactory(int tasksMax, String topic, String clusterName, String hostAddress, + String index, String transformerKey) { + super(tasksMax, topic, clusterName, hostAddress, index); + this.index = index; + this.transformerKey = transformerKey; + } + + + @Override + public Properties getProperties() { + Properties connectorProps = super.getProperties(); + + connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG, "ElasticSearchTransformer"); + connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG + ".ElasticSearchTransformer.type", + "org.apache.camel.kafkaconnector.sink.elasticsearch.transforms.ConnectRecordValueToMapTransformer"); + connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG + ".ElasticSearchTransformer.key", + transformerKey); + + String queueUrl = "elasticsearch-rest://" + getClusterName() + "?hostAddresses=" + getHostAddress() + + "&operation=Index" + + "&indexName=" + index; + + connectorProps.put("camel.sink.url", queueUrl); + + + return connectorProps; + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchPropertyFactory.java new file mode 100644 index 0000000..b5bb19e --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelElasticSearchPropertyFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sink.elasticsearch; + +import java.util.Properties; + +import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; +import org.apache.kafka.connect.runtime.ConnectorConfig; + +public class CamelElasticSearchPropertyFactory implements ConnectorPropertyFactory { + private final int tasksMax; + private final String topic; + private final String clusterName; + private final String hostAddress; + private final String index; + + + CamelElasticSearchPropertyFactory(int tasksMax, String topic, String clusterName, String hostAddress, String index) { + this.tasksMax = tasksMax; + this.topic = topic; + this.clusterName = clusterName; + this.hostAddress = hostAddress; + this.index = index; + } + + protected int getTasksMax() { + return tasksMax; + } + + protected String getTopic() { + return topic; + } + + protected String getClusterName() { + return clusterName; + } + + protected String getHostAddress() { + return hostAddress; + } + + protected String getIndex() { + return index; + } + + @Override + public Properties getProperties() { + Properties connectorProps = new Properties(); + connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelElasticSearchSinkConnector"); + connectorProps.put("tasks.max", String.valueOf(tasksMax)); + + connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector"); + connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + + connectorProps.put("topics", topic); + + return connectorProps; + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java new file mode 100644 index 0000000..600c00e --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sink.elasticsearch; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.ContainerUtil; +import org.apache.camel.kafkaconnector.KafkaConnectRunner; +import org.apache.camel.kafkaconnector.TestCommon; +import org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient; +import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +import static org.junit.Assert.fail; + +public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { + private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class); + // This is required in order to use the Open Source one by default + private static final String ELASTIC_SEARCH_CONTAINER = "docker.elastic.co/elasticsearch/elasticsearch-oss:7.3.2"; + + private static final int ELASTIC_SEARCH_PORT = 9200; + + @Rule + public ElasticsearchContainer elasticsearch = new ElasticsearchContainer(ELASTIC_SEARCH_CONTAINER); + + private KafkaConnectRunner kafkaConnectRunner; + private ElasticSearchClient client; + + private final int expect = 10; + private int received; + private final String transformKey = "index-test"; + + @Before + public void setUp() { + ContainerUtil.waitForHttpInitialization(elasticsearch, elasticsearch.getMappedPort(ELASTIC_SEARCH_PORT)); + + final String elasticSearchInstance = elasticsearch + .getHttpHostAddress(); + + LOG.info("ElasticSearch instance running at {}", elasticSearchInstance); + + String topic = TestCommon.getDefaultTestTopic(this.getClass()); + CamelElasticSearchPropertyFactory testProperties = new CamelElasticSearchIndexPropertyFactory(1, topic, + TestCommon.DEFAULT_ELASTICSEARCH_CLUSTER, + elasticSearchInstance, TestCommon.DEFAULT_ELASTICSEARCH_INDEX, transformKey); + + kafkaConnectRunner = getKafkaConnectRunner(); + kafkaConnectRunner.getConnectorPropertyProducers().add(testProperties); + + client = new ElasticSearchClient(elasticsearch.getMappedPort(ELASTIC_SEARCH_PORT), + TestCommon.DEFAULT_ELASTICSEARCH_INDEX); + } + + private void putRecords(CountDownLatch latch) { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + try { + for (int i = 0; i < expect; i++) { + try { + kafkaClient.produce(TestCommon.getDefaultTestTopic(this.getClass()), "test"); + } catch (ExecutionException e) { + LOG.error("Unable to produce messages: {}", e.getMessage(), e); + } catch (InterruptedException e) { + break; + } + } + } finally { + latch.countDown(); + } + + } + + private void verifyHit(SearchHit searchHit) { + String source = searchHit.getSourceAsString(); + + Assert.assertTrue(source != null); + Assert.assertFalse(source.isEmpty()); + + // TODO: this is not enough, we need to parse the json and check the key itself + Assert.assertTrue(source.contains(transformKey)); + + LOG.debug("Search hit: {} ", searchHit.getSourceAsString()); + received++; + } + + + + @Test(timeout = 90000) + public void testIndexOperation() { + try { + CountDownLatch latch = new CountDownLatch(2); + ExecutorService service = Executors.newCachedThreadPool(); + service.submit(() -> kafkaConnectRunner.run(latch)); + service.submit(() -> putRecords(latch)); + + latch.await(30, TimeUnit.SECONDS); + + LOG.debug("Waiting for indices"); + + client.waitForIndex(); + + LOG.debug("Waiting for data"); + client.waitForData(expect); + + SearchHits hits = client.getData(); + + hits.forEach(this::verifyHit); + Assert.assertEquals("Did not receive the same amount of messages sent", expect, received); + + LOG.debug("Created the consumer ... About to receive messages"); + } catch (Exception e) { + LOG.error("ElasticSearch test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } finally { + kafkaConnectRunner.stop(); + } + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/transforms/ConnectRecordValueToMapTransformer.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/transforms/ConnectRecordValueToMapTransformer.java new file mode 100644 index 0000000..44ef5ac --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/transforms/ConnectRecordValueToMapTransformer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sink.elasticsearch.transforms; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.kafkaconnector.utils.SchemaHelper; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +public class ConnectRecordValueToMapTransformer<R extends ConnectRecord<R>> implements Transformation<R> { + public static final String FIELD_KEY_CONFIG = "key"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + "Transforms String-based content from Kafka into a map"); + + private String key; + + @Override + public R apply(R r) { + Map<String, Object> targetMap = new HashMap<>(); + + Object value = r.value(); + + targetMap.put(key, value); + return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), + SchemaHelper.buildSchemaBuilderForType(value), targetMap, r.timestamp()); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map<String, ?> map) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, map); + + this.key = config.getString(FIELD_KEY_CONFIG); + + if (this.key == null) { + throw new ConfigException("The ElasticSearch transformer requires a 'key'"); + } + } +} diff --git a/tests/src/test/resources/log4j2.properties b/tests/src/test/resources/log4j2.properties index eefb917..b3e5e53 100644 --- a/tests/src/test/resources/log4j2.properties +++ b/tests/src/test/resources/log4j2.properties @@ -35,3 +35,7 @@ logger.reflections.appenderRef.file.ref = file logger.camel-aws.name = org.apache.camel.component.aws logger.camel-aws.level = WARN logger.camel-aws.appenderRef.file.ref = file + +logger.camel-elasticsearch.name = org.apache.camel.component.elasticsearch +logger.camel-elasticsearch.level = DEBUG +logger.camel-elasticsearch.appenderRef.file.ref = file