This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7530d64 [tests] improve connector related integration tests (#2587) 7530d64 is described below commit 7530d64a679a0783122b18058c1148c89c0fee0a Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Tue Sep 18 14:50:51 2018 -0700 [tests] improve connector related integration tests (#2587) *Motivation* with more and more connector are added, it becomes expensive to start all external services at the begin. *Changes* - refactor the connector testing framework to start external service before methods - fix kafka, cassandra and mysql connectors --- distribution/io/src/assemble/io.xml | 6 + .../broker/service/PersistentQueueE2ETest.java | 60 ---------- .../functions/instance/JavaInstanceRunnable.java | 3 +- .../pulsar/functions/source/TopicSchema.java | 3 + ...rchAbstractSink.java => ElasticSearchSink.java} | 9 +- .../io/elasticsearch/ElasticSearchStringSink.java | 35 ------ .../resources/META-INF/services/pulsar-io.yaml | 4 +- .../io/elasticsearch/ElasticSearchSinkTests.java | 4 +- .../apache/pulsar/io/kafka/KafkaAbstractSink.java | 6 +- .../{KafkaStringSink.java => KafkaBytesSink.java} | 25 ++++- .../resources/META-INF/services/pulsar-io.yaml | 2 +- site2/docs/io-quickstart.md | 2 +- .../version-2.1.0-incubating/io-quickstart.md | 2 +- .../integration/functions/PulsarFunctionsTest.java | 39 +++++-- .../functions/utils/CommandGenerator.java | 2 +- .../integration/io/CassandraSinkArchiveTester.java | 121 --------------------- .../tests/integration/io/CassandraSinkTester.java | 45 +++++--- .../integration/io/ElasticSearchSinkTester.java | 34 +++--- .../tests/integration/io/HdfsSinkTester.java | 22 ++-- .../tests/integration/io/JdbcSinkTester.java | 38 ++++--- .../tests/integration/io/KafkaSinkTester.java | 28 +++-- .../tests/integration/io/KafkaSourceTester.java | 12 +- .../pulsar/tests/integration/io/SinkTester.java | 25 ++++- .../pulsar/tests/integration/io/SourceTester.java | 4 +- .../tests/integration/suites/PulsarTestSuite.java | 50 --------- .../integration/topologies/PulsarCluster.java | 17 +++ 26 files changed, 212 insertions(+), 386 deletions(-) diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index 08ff859..a509e19 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -92,5 +92,11 @@ <outputDirectory>connectors</outputDirectory> <fileMode>644</fileMode> </file> + + <file> + <source>${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar</source> + <outputDirectory>connectors</outputDirectory> + <fileMode>644</fileMode> + </file> </files> </assembly> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java index 2937ca0..fb7a76c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java @@ -233,66 +233,6 @@ public class PersistentQueueE2ETest extends BrokerTestBase { deleteTopic(topicName); } - @Test - public void testConsumersWithDifferentPermits() throws Exception { - final String topicName = "persistent://prop/use/ns-abc/shared-topic4"; - final String subName = "sub4"; - final int numMsgs = 10000; - - final AtomicInteger msgCountConsumer1 = new AtomicInteger(0); - final AtomicInteger msgCountConsumer2 = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(numMsgs); - - int recvQ1 = 10; - Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ1) - .messageListener((consumer, msg) -> { - msgCountConsumer1.incrementAndGet(); - try { - consumer.acknowledge(msg); - latch.countDown(); - } catch (PulsarClientException e) { - fail("Should not fail"); - } - }).subscribe(); - - int recvQ2 = 1; - Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ2) - .messageListener((consumer, msg) -> { - msgCountConsumer2.incrementAndGet(); - try { - consumer.acknowledge(msg); - latch.countDown(); - } catch (PulsarClientException e) { - fail("Should not fail"); - } - }).subscribe(); - - List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .enableBatching(false) - .maxPendingMessages(numMsgs + 1) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); - for (int i = 0; i < numMsgs; i++) { - String message = "msg-" + i; - futures.add(producer.sendAsync(message.getBytes())); - } - FutureUtil.waitForAll(futures).get(); - producer.close(); - - latch.await(5, TimeUnit.SECONDS); - - assertEquals(msgCountConsumer1.get(), numMsgs - numMsgs / (recvQ1 + recvQ2), numMsgs * 0.1); - assertEquals(msgCountConsumer2.get(), numMsgs / (recvQ1 + recvQ2), numMsgs * 0.1); - - consumer1.close(); - consumer2.close(); - - deleteTopic(topicName); - } - // this test is good to have to see the distribution, but every now and then it gets slightly different than the // expected numbers. keeping this disabled to not break the build, but nevertheless this gives good insight into // how the round robin distribution algorithm is behaving diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 4ba7340..b3f86ea 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -146,7 +146,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName()); ThreadContext.put("instance", instanceConfig.getInstanceId()); - log.info("Starting Java Instance {}", instanceConfig.getFunctionDetails().getName()); + log.info("Starting Java Instance {} : \n Details = {}", + instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails()); // start the function thread loadJars(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index 76375dc..db9e880 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -78,6 +78,9 @@ public class TopicSchema { private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) { if (GenericRecord.class.isAssignableFrom(clazz)) { return SchemaType.AUTO; + } else if (byte[].class.equals(clazz)) { + // if function uses bytes, we should ignore + return SchemaType.NONE; } else { Optional<SchemaInfo> schema = ((PulsarClientImpl) client).getSchema(topic).join(); if (schema.isPresent()) { diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java similarity index 94% rename from pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java rename to pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java index 3760d40..86546f3 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java @@ -51,7 +51,7 @@ import org.elasticsearch.common.xcontent.XContentType; * Users need to implement extractKeyValue function to use this sink. * This class assumes that the input will be JSON documents */ -public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> { +public class ElasticSearchSink implements Sink<byte[]> { protected static final String DOCUMENT = "doc"; @@ -74,7 +74,7 @@ public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> { @Override public void write(Record<byte[]> record) { - KeyValue<K, V> keyValue = extractKeyValue(record); + KeyValue<String, byte[]> keyValue = extractKeyValue(record); IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName()); indexRequest.type(DOCUMENT); indexRequest.source(keyValue.getValue(), XContentType.JSON); @@ -91,7 +91,10 @@ public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> { } } - public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record); + public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) { + String key = record.getKey().orElseGet(null); + return new KeyValue<>(key, record.getValue()); + } private void createIndexIfNeeded() throws IOException { GetIndexRequest request = new GetIndexRequest(); diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java deleted file mode 100644 index 6cfa03d..0000000 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.elasticsearch; - -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; - -/** - * Concrete ElasticSearch sink. - * This class assumes that the input will be JSON documents - */ -public class ElasticSearchStringSink extends ElasticSearchAbstractSink<String, String> { - - @Override - public KeyValue<String, String> extractKeyValue(Record<byte[]> record) { - String key = record.getKey().orElseGet(() -> new String(record.getValue())); - return new KeyValue<>(key, new String(record.getValue())); - } -} diff --git a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml index 0307516..97789e9 100644 --- a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml @@ -17,6 +17,6 @@ # under the License. # -name: Elastic Search +name: elastic_search description: Writes data into Elastic Search -sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchStringSink +sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchSink diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java index ea2b886..f188829 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java @@ -56,7 +56,7 @@ public class ElasticSearchSinkTests { @Mock protected SinkContext mockSinkContext; protected Map<String, Object> map; - protected ElasticSearchStringSink sink; + protected ElasticSearchSink sink; @BeforeClass public static final void init() { @@ -71,7 +71,7 @@ public class ElasticSearchSinkTests { public final void setUp() throws Exception { map = new HashMap<String, Object> (); map.put("elasticSearchUrl", "http://localhost:9200"); - sink = new ElasticSearchStringSink(); + sink = new ElasticSearchSink(); mockRecord = mock(Record.class); mockSinkContext = mock(SinkContext.class); diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java index a92a368..50ce4b9 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java @@ -70,6 +70,10 @@ public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> { } } + protected Properties beforeCreateProducer(Properties props) { + return props; + } + @Override public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { kafkaSinkConfig = KafkaSinkConfig.load(config); @@ -89,7 +93,7 @@ public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getKeySerializerClass()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getValueSerializerClass()); - producer = new KafkaProducer<>(props); + producer = new KafkaProducer<>(beforeCreateProducer(props)); log.info("Kafka sink started."); } diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java similarity index 50% rename from pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java rename to pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java index 89e3e7f..9ce2bdc 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java @@ -19,16 +19,31 @@ package org.apache.pulsar.io.kafka; +import java.util.Properties; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.KeyValue; /** - * Kafka sink that treats incoming messages on the input topic as Strings - * and write identical key/value pairs. + * Kafka sink should treats incoming messages as pure bytes. So we don't + * apply schema into it. */ -public class KafkaStringSink extends KafkaAbstractSink<String, String> { +@Slf4j +public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> { + + @Override + protected Properties beforeCreateProducer(Properties props) { + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + log.info("Created kafka producer config : {}", props); + return props; + } + @Override - public KeyValue<String, String> extractKeyValue(Record<byte[]> record) { - return new KeyValue<>(record.getKey().orElse(null), new String(record.getValue())); + public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) { + return new KeyValue<>(record.getKey().orElse(null), record.getValue()); } } \ No newline at end of file diff --git a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml index a7bc813..7afc154 100644 --- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,4 +20,4 @@ name: kafka description: Kafka source and sink connector sourceClass: org.apache.pulsar.io.kafka.KafkaStringSource -sinkClass: org.apache.pulsar.io.kafka.KafkaStringSink +sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md index 8b8cfd3..4a48bc0 100644 --- a/site2/docs/io-quickstart.md +++ b/site2/docs/io-quickstart.md @@ -123,7 +123,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors Example output: ```json -[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connect [...] +[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connecto [...] ``` If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running `pulsar/standalone`, diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md index 7b21379..afa8e31 100644 --- a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md +++ b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md @@ -124,7 +124,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors Example output: ```json -[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connect [...] +[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connecto [...] ``` If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running `pulsar/standalone`, diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 5f11eef..e9ea618 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.tests.integration.io.*; import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.testcontainers.containers.GenericContainer; import org.testng.annotations.Test; /** @@ -62,17 +63,17 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { @Test public void testKafkaSink() throws Exception { - testSink(new KafkaSinkTester(), true); + testSink(new KafkaSinkTester(), true, new KafkaSourceTester()); } @Test public void testCassandraSink() throws Exception { - testSink(new CassandraSinkTester(), true); + testSink(CassandraSinkTester.createTester(true), true); } @Test public void testCassandraArchiveSink() throws Exception { - testSink(new CassandraSinkArchiveTester(), false); + testSink(CassandraSinkTester.createTester(false), false); } @Test(enabled = false) @@ -91,8 +92,31 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { } private void testSink(SinkTester tester, boolean builtin) throws Exception { - tester.findSinkServiceContainer(pulsarCluster.getExternalServices()); + tester.startServiceContainer(pulsarCluster); + try { + runSinkTester(tester, builtin); + } finally { + tester.stopServiceContainer(pulsarCluster); + } + } + + private <ServiceContainerT extends GenericContainer> void testSink(SinkTester<ServiceContainerT> sinkTester, + boolean builtinSink, + SourceTester<ServiceContainerT> sourceTester) + throws Exception { + ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster); + try { + runSinkTester(sinkTester, builtinSink); + if (null != sourceTester) { + sourceTester.setServiceContainer(serviceContainer); + testSource(sourceTester); + } + } finally { + sinkTester.stopServiceContainer(pulsarCluster); + } + } + private void runSinkTester(SinkTester tester, boolean builtin) throws Exception { final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String inputTopicName = "test-sink-connector-" @@ -357,14 +381,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { // Source Test // - @Test - public void testKafkaSource() throws Exception { - testSource(new KafkaSourceTester()); - } - private void testSource(SourceTester tester) throws Exception { - tester.findSourceServiceContainer(pulsarCluster.getExternalServices()); - final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String outputTopicName = "test-source-connector-" diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java index 6578d08..6f4d012 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java @@ -161,7 +161,7 @@ public class CommandGenerator { } public String generateUpdateFunctionCommand(String codeFile) { - StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m "); + StringBuilder commandBuilder = new StringBuilder(); if (adminUrl == null) { commandBuilder.append("/pulsar/bin/pulsar-admin functions update"); } else { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java deleted file mode 100644 index 86c7689..0000000 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.tests.integration.io; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.tests.integration.containers.CassandraContainer; -import org.testcontainers.containers.GenericContainer; - -import java.util.List; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkState; -import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -/** - * A tester for testing cassandra sink submitted as an archive. - */ -@Slf4j -public class CassandraSinkArchiveTester extends SinkTester { - - private static final String NAME = "cassandra"; - - private static final String ROOTS = "cassandra"; - private static final String KEY = "key"; - private static final String COLUMN = "col"; - - private final String keySpace; - private final String tableName; - - private CassandraContainer cassandraCluster; - - private Cluster cluster; - private Session session; - - public CassandraSinkArchiveTester() { - super("/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", "org.apache.pulsar.io.cassandra.CassandraStringSink"); - - String suffix = randomName(8) + "_" + System.currentTimeMillis(); - this.keySpace = "keySpace_" + suffix; - this.tableName = "tableName_" + suffix; - - sinkConfig.put("roots", ROOTS); - sinkConfig.put("keyspace", keySpace); - sinkConfig.put("columnFamily", tableName); - sinkConfig.put("keyname", KEY); - sinkConfig.put("columnName", COLUMN); - } - - @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof CassandraContainer, - "No kafka service found in the cluster"); - - this.cassandraCluster = (CassandraContainer) container; - } - - @Override - public void prepareSink() { - // build the sink - cluster = Cluster.builder() - .addContactPoint("localhost") - .withPort(cassandraCluster.getCassandraPort()) - .build(); - - // connect to the cluster - session = cluster.connect(); - log.info("Connecting to cassandra cluster at localhost:{}", cassandraCluster.getCassandraPort()); - - String createKeySpace = - "CREATE KEYSPACE " + keySpace - + " WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}; "; - log.info(createKeySpace); - session.execute(createKeySpace); - session.execute("USE " + keySpace); - - String createTable = "CREATE TABLE " + tableName - + "(" + KEY + " text PRIMARY KEY, " - + COLUMN + " text);"; - log.info(createTable); - session.execute(createTable); - } - - @Override - public void validateSinkResult(Map<String, String> kvs) { - String query = "SELECT * FROM " + tableName + ";"; - ResultSet result = session.execute(query); - List<Row> rows = result.all(); - assertEquals(kvs.size(), rows.size()); - for (Row row : rows) { - String key = row.getString(KEY); - String value = row.getString(COLUMN); - - String expectedValue = kvs.get(key); - assertNotNull(expectedValue); - assertEquals(expectedValue, value); - } - } -} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java index c9d3e5a..3309358 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java @@ -24,12 +24,11 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.tests.integration.containers.CassandraContainer; -import org.testcontainers.containers.GenericContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import java.util.List; import java.util.Map; -import static com.google.common.base.Preconditions.checkState; import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -38,7 +37,15 @@ import static org.junit.Assert.assertNotNull; * A tester for testing cassandra sink. */ @Slf4j -public class CassandraSinkTester extends SinkTester { +public class CassandraSinkTester extends SinkTester<CassandraContainer> { + + public static CassandraSinkTester createTester(boolean builtin) { + if (builtin) { + return new CassandraSinkTester(builtin); + } else { + return new CassandraSinkTester(); + } + } private static final String NAME = "cassandra"; @@ -49,13 +56,11 @@ public class CassandraSinkTester extends SinkTester { private final String keySpace; private final String tableName; - private CassandraContainer cassandraCluster; - private Cluster cluster; private Session session; - public CassandraSinkTester() { - super(SinkType.CASSANDRA); + private CassandraSinkTester() { + super(NAME, "/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", "org.apache.pulsar.io.cassandra.CassandraStringSink"); String suffix = randomName(8) + "_" + System.currentTimeMillis(); this.keySpace = "keySpace_" + suffix; @@ -68,13 +73,23 @@ public class CassandraSinkTester extends SinkTester { sinkConfig.put("columnName", COLUMN); } - @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof CassandraContainer, - "No kafka service found in the cluster"); + private CassandraSinkTester(boolean builtin) { + super(NAME, SinkType.CASSANDRA); + + String suffix = randomName(8) + "_" + System.currentTimeMillis(); + this.keySpace = "keySpace_" + suffix; + this.tableName = "tableName_" + suffix; - this.cassandraCluster = (CassandraContainer) container; + sinkConfig.put("roots", ROOTS); + sinkConfig.put("keyspace", keySpace); + sinkConfig.put("columnFamily", tableName); + sinkConfig.put("keyname", KEY); + sinkConfig.put("columnName", COLUMN); + } + + @Override + protected CassandraContainer createSinkService(PulsarCluster cluster) { + return new CassandraContainer(cluster.getClusterName()); } @Override @@ -82,12 +97,12 @@ public class CassandraSinkTester extends SinkTester { // build the sink cluster = Cluster.builder() .addContactPoint("localhost") - .withPort(cassandraCluster.getCassandraPort()) + .withPort(serviceContainer.getCassandraPort()) .build(); // connect to the cluster session = cluster.connect(); - log.info("Connecting to cassandra cluster at localhost:{}", cassandraCluster.getCassandraPort()); + log.info("Connecting to cassandra cluster at localhost:{}", serviceContainer.getCassandraPort()); String createKeySpace = "CREATE KEYSPACE " + keySpace diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java index 0effc8e..eee208e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java @@ -18,57 +18,57 @@ */ package org.apache.pulsar.tests.integration.io; -import static com.google.common.base.Preconditions.checkState; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.util.Map; -import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; -import org.testcontainers.containers.GenericContainer; -public class ElasticSearchSinkTester extends SinkTester { - +public class ElasticSearchSinkTester extends SinkTester<ElasticSearchContainer> { + private RestHighLevelClient elasticClient; public ElasticSearchSinkTester() { - super(SinkType.ELASTIC_SEARCH); + super(ElasticSearchContainer.NAME, SinkType.ELASTIC_SEARCH); - sinkConfig.put("elasticSearchUrl", "http://localhost:9200"); + sinkConfig.put("elasticSearchUrl", "http://" + ElasticSearchContainer.NAME + ":9200"); sinkConfig.put("indexName", "test-index"); } + @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices) { - GenericContainer<?> container = externalServices.get(ElasticSearchContainer.NAME); - checkState(container instanceof ElasticSearchContainer, - "No ElasticSearch service found in the cluster"); + protected ElasticSearchContainer createSinkService(PulsarCluster cluster) { + return new ElasticSearchContainer(cluster.getClusterName()); } @Override public void prepareSink() throws Exception { - RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http")); + RestClientBuilder builder = RestClient.builder( + new HttpHost( + "localhost", + serviceContainer.getMappedPort(9200), + "http")); elasticClient = new RestHighLevelClient(builder); } @Override public void validateSinkResult(Map<String, String> kvs) { - SearchRequest searchRequest = new SearchRequest("test-index"); searchRequest.types("doc"); try { - Header headers = null; - SearchResponse searchResult = elasticClient.search(searchRequest, headers); - assertTrue(searchResult.getHits().getTotalHits() > 0); + SearchResponse searchResult = elasticClient.search(searchRequest); + assertTrue(searchResult.getHits().getTotalHits() > 0, searchResult.toString()); } catch (Exception e) { - e.printStackTrace(); + fail("Encountered exception on validating elastic search results", e); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java index 46c5f24..957b93a 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java @@ -21,18 +21,14 @@ package org.apache.pulsar.tests.integration.io; import java.util.Map; import org.apache.pulsar.tests.integration.containers.HdfsContainer; -import org.testcontainers.containers.GenericContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; -import static com.google.common.base.Preconditions.checkState; - -public class HdfsSinkTester extends SinkTester { +public class HdfsSinkTester extends SinkTester<HdfsContainer> { private static final String NAME = "HDFS"; - private HdfsContainer hdfsCluster; - public HdfsSinkTester() { - super(SinkType.HDFS); + super(NAME, SinkType.HDFS); // TODO How do I get the core-site.xml, and hdfs-site.xml files from the container? sinkConfig.put("hdfsConfigResources", ""); @@ -40,20 +36,18 @@ public class HdfsSinkTester extends SinkTester { } @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof HdfsContainer, "No HDFS service found in the cluster"); - this.hdfsCluster = (HdfsContainer) container; + protected HdfsContainer createSinkService(PulsarCluster cluster) { + return new HdfsContainer(cluster.getClusterName()); } @Override public void prepareSink() throws Exception { // Create the test directory - hdfsCluster.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", "/tmp/testing"); - hdfsCluster.execInContainer("/hadoop/bin/hdfs", "-chown", "tester:testing", "/tmp/testing"); + serviceContainer.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", "/tmp/testing"); + serviceContainer.execInContainer("/hadoop/bin/hdfs", "-chown", "tester:testing", "/tmp/testing"); // Execute all future commands as the "tester" user - hdfsCluster.execInContainer("export HADOOP_USER_NAME=tester"); + serviceContainer.execInContainer("export HADOOP_USER_NAME=tester"); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java index 7c14ba9..72d9b01 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java @@ -18,22 +18,23 @@ */ package org.apache.pulsar.tests.integration.io; -import static com.google.common.base.Preconditions.checkState; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; +import com.github.dockerjava.api.command.CreateContainerCmd; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Map; +import java.util.function.Consumer; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.testcontainers.containers.GenericContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.containers.MySQLContainer; /** @@ -41,7 +42,7 @@ import org.testcontainers.containers.MySQLContainer; * This will use MySql as DB server */ @Slf4j -public class JdbcSinkTester extends SinkTester { +public class JdbcSinkTester extends SinkTester<MySQLContainer> { /** * A Simple class to test jdbc class, @@ -57,14 +58,14 @@ public class JdbcSinkTester extends SinkTester { } private static final String NAME = "jdbc"; + private static final String MYSQL = "mysql"; - private MySQLContainer mySQLContainer; private AvroSchema<Foo> schema = AvroSchema.of(Foo.class); private String tableName = "test"; private Connection connection; public JdbcSinkTester() { - super(SinkType.JDBC); + super(NAME, SinkType.JDBC); // container default value is test sinkConfig.put("userName", "test"); @@ -79,21 +80,28 @@ public class JdbcSinkTester extends SinkTester { } @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get("mysql"); - checkState(container instanceof MySQLContainer, - "No MySQL service found in the cluster"); - - this.mySQLContainer = (MySQLContainer) container; - log.info("find sink service container: {}", mySQLContainer.getContainerName()); + protected MySQLContainer createSinkService(PulsarCluster cluster) { + return (MySQLContainer) new MySQLContainer() + .withUsername("test") + .withPassword("test") + .withDatabaseName("test") + .withNetworkAliases(MYSQL) + .withCreateContainerCmdModifier(new Consumer<CreateContainerCmd>() { + @Override + public void accept(CreateContainerCmd createContainerCmd) { + createContainerCmd + .withName(MYSQL) + .withHostName(cluster.getClusterName() + "-" + MYSQL); + } + }); } @Override public void prepareSink() throws Exception { - String jdbcUrl = mySQLContainer.getJdbcUrl(); + String jdbcUrl = serviceContainer.getJdbcUrl(); // we need set mysql server address in cluster network. - sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test"); - String driver = mySQLContainer.getDriverClassName(); + sinkConfig.put("jdbcUrl", "jdbc:mysql://" + MYSQL + ":3306/test"); + String driver = serviceContainer.getDriverClassName(); Class.forName(driver); connection = DriverManager.getConnection(jdbcUrl, "test", "test"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java index ff79e1a..6713cc1 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.tests.integration.io; -import static com.google.common.base.Preconditions.checkState; import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -32,8 +31,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.containers.Container.ExecResult; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -41,18 +40,15 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; * A tester for testing kafka sink. */ @Slf4j -public class KafkaSinkTester extends SinkTester { +public class KafkaSinkTester extends SinkTester<KafkaContainer> { private static final String NAME = "kafka"; private final String kafkaTopicName; - - private KafkaContainer kafkaContainer; - private KafkaConsumer<String, String> kafkaConsumer; public KafkaSinkTester() { - super(SinkType.KAFKA); + super(NAME, SinkType.KAFKA); String suffix = randomName(8) + "_" + System.currentTimeMillis(); this.kafkaTopicName = "kafka_sink_topic_" + suffix; @@ -64,17 +60,19 @@ public class KafkaSinkTester extends SinkTester { } @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof KafkaContainer, - "No kafka service found in the cluster"); - - this.kafkaContainer = (KafkaContainer) container; + protected KafkaContainer createSinkService(PulsarCluster cluster) { + final String kafkaServiceName = NAME; + return new KafkaContainer() + .withEmbeddedZookeeper() + .withNetworkAliases(kafkaServiceName) + .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd + .withName(kafkaServiceName) + .withHostName(cluster.getClusterName() + "-" + kafkaServiceName)); } @Override public void prepareSink() throws Exception { - ExecResult execResult = kafkaContainer.execInContainer( + ExecResult execResult = serviceContainer.execInContainer( "/usr/bin/kafka-topics", "--create", "--zookeeper", @@ -91,7 +89,7 @@ public class KafkaSinkTester extends SinkTester { kafkaConsumer = new KafkaConsumer<>( ImmutableMap.of( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceContainer.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ), diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java index 4928f00..cee17b1 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.tests.integration.io; -import static com.google.common.base.Preconditions.checkState; import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName; import static org.testng.Assert.assertTrue; @@ -35,7 +34,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.testcontainers.containers.Container.ExecResult; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -43,7 +41,7 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; * A tester for testing kafka source. */ @Slf4j -public class KafkaSourceTester extends SourceTester { +public class KafkaSourceTester extends SourceTester<KafkaContainer> { private static final String NAME = "kafka"; @@ -68,12 +66,8 @@ public class KafkaSourceTester extends SourceTester { } @Override - public void findSourceServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof KafkaContainer, - "No kafka service found in the cluster"); - - this.kafkaContainer = (KafkaContainer) container; + public void setServiceContainer(KafkaContainer container) { + this.kafkaContainer = container; } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java index d2917e6..2dd0759 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java @@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.io; import java.util.Map; import lombok.Getter; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.containers.GenericContainer; import org.testng.collections.Maps; @@ -28,7 +29,7 @@ import org.testng.collections.Maps; * A tester used for testing a specific sink. */ @Getter -public abstract class SinkTester { +public abstract class SinkTester<ServiceContainerT extends GenericContainer> { public enum SinkType { UNDEFINED, @@ -39,19 +40,23 @@ public abstract class SinkTester { ELASTIC_SEARCH } + protected final String networkAlias; protected final SinkType sinkType; protected final String sinkArchive; protected final String sinkClassName; protected final Map<String, Object> sinkConfig; + protected ServiceContainerT serviceContainer; - public SinkTester(SinkType sinkType) { + public SinkTester(String networkAlias, SinkType sinkType) { + this.networkAlias = networkAlias; this.sinkType = sinkType; this.sinkArchive = null; this.sinkClassName = null; this.sinkConfig = Maps.newHashMap(); } - public SinkTester(String sinkArchive, String sinkClassName) { + public SinkTester(String networkAlias, String sinkArchive, String sinkClassName) { + this.networkAlias = networkAlias; this.sinkType = SinkType.UNDEFINED; this.sinkArchive = sinkArchive; this.sinkClassName = sinkClassName; @@ -62,7 +67,19 @@ public abstract class SinkTester { return Schema.STRING; } - public abstract void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices); + protected abstract ServiceContainerT createSinkService(PulsarCluster cluster); + + public ServiceContainerT startServiceContainer(PulsarCluster cluster) { + this.serviceContainer = createSinkService(cluster); + cluster.startService(networkAlias, serviceContainer); + return serviceContainer; + } + + public void stopServiceContainer(PulsarCluster cluster) { + if (null != serviceContainer) { + cluster.stopService(networkAlias, serviceContainer); + } + } public SinkType sinkType() { return sinkType; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java index dc58f2f..f1feb70 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java @@ -27,7 +27,7 @@ import org.testng.collections.Maps; * A tester used for testing a specific source. */ @Getter -public abstract class SourceTester { +public abstract class SourceTester<ServiceContainerT extends GenericContainer> { protected final String sourceType; protected final Map<String, Object> sourceConfig; @@ -37,7 +37,7 @@ public abstract class SourceTester { this.sourceConfig = Maps.newHashMap(); } - public abstract void findSourceServiceContainer(Map<String, GenericContainer<?>> externalServices); + public abstract void setServiceContainer(ServiceContainerT serviceContainer); public String sourceType() { return sourceType; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java index b4a6b83..20b9da0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java @@ -18,19 +18,10 @@ */ package org.apache.pulsar.tests.integration.suites; -import java.util.Map; -import org.apache.pulsar.tests.integration.containers.CassandraContainer; -import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer; -import org.apache.pulsar.tests.integration.containers.HdfsContainer; -import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder; import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.MySQLContainer; import org.testng.ITest; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; -import org.testng.collections.Maps; public class PulsarTestSuite extends PulsarClusterTestBase implements ITest { @@ -47,47 +38,6 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest { } @Override - protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpecBuilder specBuilder) { - PulsarClusterSpecBuilder builder = super.beforeSetupCluster(clusterName, specBuilder); - - // start functions - - // register external services - Map<String, GenericContainer<?>> externalServices = Maps.newHashMap(); - - final String kafkaServiceName = "kafka"; - externalServices.put( - kafkaServiceName, - new KafkaContainer() - .withEmbeddedZookeeper() - .withNetworkAliases(kafkaServiceName) - .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd - .withName(kafkaServiceName) - .withHostName(clusterName + "-" + kafkaServiceName))); - - final String cassandraServiceName = "cassandra"; - externalServices.put( - cassandraServiceName, - new CassandraContainer(clusterName)); - - // use mySQL for jdbc test - final String jdbcServiceName = "mysql"; - externalServices.put( - jdbcServiceName, - new MySQLContainer() - .withExposedPorts(3306)); - - externalServices.put( - ElasticSearchContainer.NAME, - new ElasticSearchContainer(ElasticSearchContainer.NAME) - .withExposedPorts(9200)); - - builder = builder.externalServices(externalServices); - - return builder; - } - - @Override public String getTestName() { return "pulsar-test-suite"; } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index f78097b..af60712 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -209,6 +209,23 @@ public class PulsarCluster { } } + public void startService(String networkAlias, + GenericContainer<?> serviceContainer) { + log.info("Starting external service {} ...", networkAlias); + serviceContainer.withNetwork(network); + serviceContainer.withNetworkAliases(networkAlias); + serviceContainer.start(); + log.info("Successfully start external service {}", networkAlias); + } + + public void stopService(String networkAlias, + GenericContainer<?> serviceContainer) { + log.info("Stopping external service {} ...", networkAlias); + serviceContainer.stop(); + log.info("Successfully stop external service {}", networkAlias); + } + + private static <T extends PulsarContainer> Map<String, T> runNumContainers(String serviceName, int numContainers, Function<String, T> containerCreator) {