sijie closed pull request #2587: [tests] improve connector related integration tests URL: https://github.com/apache/incubator-pulsar/pull/2587
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index 08ff859e44..a509e19254 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -92,5 +92,11 @@ <outputDirectory>connectors</outputDirectory> <fileMode>644</fileMode> </file> + + <file> + <source>${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar</source> + <outputDirectory>connectors</outputDirectory> + <fileMode>644</fileMode> + </file> </files> </assembly> diff --git a/docker/pom.xml b/docker/pom.xml index bdc99f7297..302bda80cc 100644 --- a/docker/pom.xml +++ b/docker/pom.xml @@ -31,9 +31,6 @@ <groupId>org.apache.pulsar</groupId> <artifactId>docker-images</artifactId> <name>Apache Pulsar :: Docker Images</name> - <properties> - <docker.organization>apachepulsar</docker.organization> - </properties> <modules> <module>pulsar</module> <module>grafana</module> diff --git a/pom.xml b/pom.xml index 6d45a0093d..91ba14c55a 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,7 @@ flexible messaging model and an intuitive client API.</description> <redirectTestOutputToFile>true</redirectTestOutputToFile> <testRealAWS>false</testRealAWS> <testRetryCount>1</testRetryCount> + <docker.organization>apachepulsar</docker.organization> <!-- pin the protobuf-shaded version to make the pulsar build friendly to intellij --> <pulsar.protobuf.shaded.version>2.1.0-incubating</pulsar.protobuf.shaded.version> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java index 2937ca0caf..fb7a76cdff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java @@ -233,66 +233,6 @@ public void testReplayOnConsumerDisconnect() throws Exception { deleteTopic(topicName); } - @Test - public void testConsumersWithDifferentPermits() throws Exception { - final String topicName = "persistent://prop/use/ns-abc/shared-topic4"; - final String subName = "sub4"; - final int numMsgs = 10000; - - final AtomicInteger msgCountConsumer1 = new AtomicInteger(0); - final AtomicInteger msgCountConsumer2 = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(numMsgs); - - int recvQ1 = 10; - Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ1) - .messageListener((consumer, msg) -> { - msgCountConsumer1.incrementAndGet(); - try { - consumer.acknowledge(msg); - latch.countDown(); - } catch (PulsarClientException e) { - fail("Should not fail"); - } - }).subscribe(); - - int recvQ2 = 1; - Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ2) - .messageListener((consumer, msg) -> { - msgCountConsumer2.incrementAndGet(); - try { - consumer.acknowledge(msg); - latch.countDown(); - } catch (PulsarClientException e) { - fail("Should not fail"); - } - }).subscribe(); - - List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .enableBatching(false) - .maxPendingMessages(numMsgs + 1) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); - for (int i = 0; i < numMsgs; i++) { - String message = "msg-" + i; - futures.add(producer.sendAsync(message.getBytes())); - } - FutureUtil.waitForAll(futures).get(); - producer.close(); - - latch.await(5, TimeUnit.SECONDS); - - assertEquals(msgCountConsumer1.get(), numMsgs - numMsgs / (recvQ1 + recvQ2), numMsgs * 0.1); - assertEquals(msgCountConsumer2.get(), numMsgs / (recvQ1 + recvQ2), numMsgs * 0.1); - - consumer1.close(); - consumer2.close(); - - deleteTopic(topicName); - } - // this test is good to have to see the distribution, but every now and then it gets slightly different than the // expected numbers. keeping this disabled to not break the build, but nevertheless this gives good insight into // how the round robin distribution algorithm is behaving diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 4ba7340016..b3f86eae0e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -146,7 +146,8 @@ JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception { ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName()); ThreadContext.put("instance", instanceConfig.getInstanceId()); - log.info("Starting Java Instance {}", instanceConfig.getFunctionDetails().getName()); + log.info("Starting Java Instance {} : \n Details = {}", + instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails()); // start the function thread loadJars(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index 76375dcc67..db9e880b68 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -78,6 +78,9 @@ public TopicSchema(PulsarClient client) { private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) { if (GenericRecord.class.isAssignableFrom(clazz)) { return SchemaType.AUTO; + } else if (byte[].class.equals(clazz)) { + // if function uses bytes, we should ignore + return SchemaType.NONE; } else { Optional<SchemaInfo> schema = ((PulsarClientImpl) client).getSchema(topic).join(); if (schema.isPresent()) { diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java similarity index 94% rename from pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java rename to pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java index 3760d4072b..86546f3c96 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java @@ -51,7 +51,7 @@ * Users need to implement extractKeyValue function to use this sink. * This class assumes that the input will be JSON documents */ -public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> { +public class ElasticSearchSink implements Sink<byte[]> { protected static final String DOCUMENT = "doc"; @@ -74,7 +74,7 @@ public void close() throws Exception { @Override public void write(Record<byte[]> record) { - KeyValue<K, V> keyValue = extractKeyValue(record); + KeyValue<String, byte[]> keyValue = extractKeyValue(record); IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName()); indexRequest.type(DOCUMENT); indexRequest.source(keyValue.getValue(), XContentType.JSON); @@ -91,7 +91,10 @@ public void write(Record<byte[]> record) { } } - public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record); + public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) { + String key = record.getKey().orElseGet(null); + return new KeyValue<>(key, record.getValue()); + } private void createIndexIfNeeded() throws IOException { GetIndexRequest request = new GetIndexRequest(); diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java deleted file mode 100644 index 6cfa03d5c0..0000000000 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.elasticsearch; - -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; - -/** - * Concrete ElasticSearch sink. - * This class assumes that the input will be JSON documents - */ -public class ElasticSearchStringSink extends ElasticSearchAbstractSink<String, String> { - - @Override - public KeyValue<String, String> extractKeyValue(Record<byte[]> record) { - String key = record.getKey().orElseGet(() -> new String(record.getValue())); - return new KeyValue<>(key, new String(record.getValue())); - } -} diff --git a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml index 0307516cc8..97789e9851 100644 --- a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml @@ -17,6 +17,6 @@ # under the License. # -name: Elastic Search +name: elastic_search description: Writes data into Elastic Search -sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchStringSink +sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchSink diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java index ea2b886e39..f1888293f8 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java @@ -56,7 +56,7 @@ @Mock protected SinkContext mockSinkContext; protected Map<String, Object> map; - protected ElasticSearchStringSink sink; + protected ElasticSearchSink sink; @BeforeClass public static final void init() { @@ -71,7 +71,7 @@ public static final void init() { public final void setUp() throws Exception { map = new HashMap<String, Object> (); map.put("elasticSearchUrl", "http://localhost:9200"); - sink = new ElasticSearchStringSink(); + sink = new ElasticSearchSink(); mockRecord = mock(Record.class); mockSinkContext = mock(SinkContext.class); diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java index a92a368f1c..50ce4b97b6 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java @@ -70,6 +70,10 @@ public void close() throws IOException { } } + protected Properties beforeCreateProducer(Properties props) { + return props; + } + @Override public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { kafkaSinkConfig = KafkaSinkConfig.load(config); @@ -89,7 +93,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getKeySerializerClass()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getValueSerializerClass()); - producer = new KafkaProducer<>(props); + producer = new KafkaProducer<>(beforeCreateProducer(props)); log.info("Kafka sink started."); } diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java similarity index 50% rename from pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java rename to pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java index 89e3e7f8d6..9ce2bdcb77 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java @@ -19,16 +19,31 @@ package org.apache.pulsar.io.kafka; +import java.util.Properties; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.KeyValue; /** - * Kafka sink that treats incoming messages on the input topic as Strings - * and write identical key/value pairs. + * Kafka sink should treats incoming messages as pure bytes. So we don't + * apply schema into it. */ -public class KafkaStringSink extends KafkaAbstractSink<String, String> { +@Slf4j +public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> { + + @Override + protected Properties beforeCreateProducer(Properties props) { + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + log.info("Created kafka producer config : {}", props); + return props; + } + @Override - public KeyValue<String, String> extractKeyValue(Record<byte[]> record) { - return new KeyValue<>(record.getKey().orElse(null), new String(record.getValue())); + public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) { + return new KeyValue<>(record.getKey().orElse(null), record.getValue()); } } \ No newline at end of file diff --git a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml index a7bc81355c..7afc154688 100644 --- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,4 +20,4 @@ name: kafka description: Kafka source and sink connector sourceClass: org.apache.pulsar.io.kafka.KafkaStringSource -sinkClass: org.apache.pulsar.io.kafka.KafkaStringSink +sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md index 8b8cfd340d..4a48bc09ce 100644 --- a/site2/docs/io-quickstart.md +++ b/site2/docs/io-quickstart.md @@ -123,7 +123,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors Example output: ```json -[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}] +[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}] ``` If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running `pulsar/standalone`, diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md index 7b21379964..afa8e31a6b 100644 --- a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md +++ b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md @@ -124,7 +124,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors Example output: ```json -[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}] +[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}] ``` If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running `pulsar/standalone`, diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 5f11eef637..e9ea6186c3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.testcontainers.containers.GenericContainer; import org.testng.annotations.Test; /** @@ -62,17 +63,17 @@ @Test public void testKafkaSink() throws Exception { - testSink(new KafkaSinkTester(), true); + testSink(new KafkaSinkTester(), true, new KafkaSourceTester()); } @Test public void testCassandraSink() throws Exception { - testSink(new CassandraSinkTester(), true); + testSink(CassandraSinkTester.createTester(true), true); } @Test public void testCassandraArchiveSink() throws Exception { - testSink(new CassandraSinkArchiveTester(), false); + testSink(CassandraSinkTester.createTester(false), false); } @Test(enabled = false) @@ -91,8 +92,31 @@ public void testElasticSearchSink() throws Exception { } private void testSink(SinkTester tester, boolean builtin) throws Exception { - tester.findSinkServiceContainer(pulsarCluster.getExternalServices()); + tester.startServiceContainer(pulsarCluster); + try { + runSinkTester(tester, builtin); + } finally { + tester.stopServiceContainer(pulsarCluster); + } + } + + private <ServiceContainerT extends GenericContainer> void testSink(SinkTester<ServiceContainerT> sinkTester, + boolean builtinSink, + SourceTester<ServiceContainerT> sourceTester) + throws Exception { + ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster); + try { + runSinkTester(sinkTester, builtinSink); + if (null != sourceTester) { + sourceTester.setServiceContainer(serviceContainer); + testSource(sourceTester); + } + } finally { + sinkTester.stopServiceContainer(pulsarCluster); + } + } + private void runSinkTester(SinkTester tester, boolean builtin) throws Exception { final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String inputTopicName = "test-sink-connector-" @@ -357,14 +381,7 @@ protected void getSinkInfoNotFound(String tenant, String namespace, String sinkN // Source Test // - @Test - public void testKafkaSource() throws Exception { - testSource(new KafkaSourceTester()); - } - private void testSource(SourceTester tester) throws Exception { - tester.findSourceServiceContainer(pulsarCluster.getExternalServices()); - final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String outputTopicName = "test-source-connector-" diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java index 6578d08cf2..6f4d012e24 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java @@ -161,7 +161,7 @@ public String generateUpdateFunctionCommand() { } public String generateUpdateFunctionCommand(String codeFile) { - StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m "); + StringBuilder commandBuilder = new StringBuilder(); if (adminUrl == null) { commandBuilder.append("/pulsar/bin/pulsar-admin functions update"); } else { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java deleted file mode 100644 index 86c76894c5..0000000000 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.tests.integration.io; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.tests.integration.containers.CassandraContainer; -import org.testcontainers.containers.GenericContainer; - -import java.util.List; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkState; -import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -/** - * A tester for testing cassandra sink submitted as an archive. - */ -@Slf4j -public class CassandraSinkArchiveTester extends SinkTester { - - private static final String NAME = "cassandra"; - - private static final String ROOTS = "cassandra"; - private static final String KEY = "key"; - private static final String COLUMN = "col"; - - private final String keySpace; - private final String tableName; - - private CassandraContainer cassandraCluster; - - private Cluster cluster; - private Session session; - - public CassandraSinkArchiveTester() { - super("/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", "org.apache.pulsar.io.cassandra.CassandraStringSink"); - - String suffix = randomName(8) + "_" + System.currentTimeMillis(); - this.keySpace = "keySpace_" + suffix; - this.tableName = "tableName_" + suffix; - - sinkConfig.put("roots", ROOTS); - sinkConfig.put("keyspace", keySpace); - sinkConfig.put("columnFamily", tableName); - sinkConfig.put("keyname", KEY); - sinkConfig.put("columnName", COLUMN); - } - - @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof CassandraContainer, - "No kafka service found in the cluster"); - - this.cassandraCluster = (CassandraContainer) container; - } - - @Override - public void prepareSink() { - // build the sink - cluster = Cluster.builder() - .addContactPoint("localhost") - .withPort(cassandraCluster.getCassandraPort()) - .build(); - - // connect to the cluster - session = cluster.connect(); - log.info("Connecting to cassandra cluster at localhost:{}", cassandraCluster.getCassandraPort()); - - String createKeySpace = - "CREATE KEYSPACE " + keySpace - + " WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}; "; - log.info(createKeySpace); - session.execute(createKeySpace); - session.execute("USE " + keySpace); - - String createTable = "CREATE TABLE " + tableName - + "(" + KEY + " text PRIMARY KEY, " - + COLUMN + " text);"; - log.info(createTable); - session.execute(createTable); - } - - @Override - public void validateSinkResult(Map<String, String> kvs) { - String query = "SELECT * FROM " + tableName + ";"; - ResultSet result = session.execute(query); - List<Row> rows = result.all(); - assertEquals(kvs.size(), rows.size()); - for (Row row : rows) { - String key = row.getString(KEY); - String value = row.getString(COLUMN); - - String expectedValue = kvs.get(key); - assertNotNull(expectedValue); - assertEquals(expectedValue, value); - } - } -} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java index c9d3e5a11f..3309358abc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java @@ -24,12 +24,11 @@ import com.datastax.driver.core.Session; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.tests.integration.containers.CassandraContainer; -import org.testcontainers.containers.GenericContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import java.util.List; import java.util.Map; -import static com.google.common.base.Preconditions.checkState; import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -38,7 +37,15 @@ * A tester for testing cassandra sink. */ @Slf4j -public class CassandraSinkTester extends SinkTester { +public class CassandraSinkTester extends SinkTester<CassandraContainer> { + + public static CassandraSinkTester createTester(boolean builtin) { + if (builtin) { + return new CassandraSinkTester(builtin); + } else { + return new CassandraSinkTester(); + } + } private static final String NAME = "cassandra"; @@ -49,13 +56,11 @@ private final String keySpace; private final String tableName; - private CassandraContainer cassandraCluster; - private Cluster cluster; private Session session; - public CassandraSinkTester() { - super(SinkType.CASSANDRA); + private CassandraSinkTester() { + super(NAME, "/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", "org.apache.pulsar.io.cassandra.CassandraStringSink"); String suffix = randomName(8) + "_" + System.currentTimeMillis(); this.keySpace = "keySpace_" + suffix; @@ -68,13 +73,23 @@ public CassandraSinkTester() { sinkConfig.put("columnName", COLUMN); } - @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof CassandraContainer, - "No kafka service found in the cluster"); + private CassandraSinkTester(boolean builtin) { + super(NAME, SinkType.CASSANDRA); + + String suffix = randomName(8) + "_" + System.currentTimeMillis(); + this.keySpace = "keySpace_" + suffix; + this.tableName = "tableName_" + suffix; - this.cassandraCluster = (CassandraContainer) container; + sinkConfig.put("roots", ROOTS); + sinkConfig.put("keyspace", keySpace); + sinkConfig.put("columnFamily", tableName); + sinkConfig.put("keyname", KEY); + sinkConfig.put("columnName", COLUMN); + } + + @Override + protected CassandraContainer createSinkService(PulsarCluster cluster) { + return new CassandraContainer(cluster.getClusterName()); } @Override @@ -82,12 +97,12 @@ public void prepareSink() { // build the sink cluster = Cluster.builder() .addContactPoint("localhost") - .withPort(cassandraCluster.getCassandraPort()) + .withPort(serviceContainer.getCassandraPort()) .build(); // connect to the cluster session = cluster.connect(); - log.info("Connecting to cassandra cluster at localhost:{}", cassandraCluster.getCassandraPort()); + log.info("Connecting to cassandra cluster at localhost:{}", serviceContainer.getCassandraPort()); String createKeySpace = "CREATE KEYSPACE " + keySpace diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java index 0effc8eadd..eee208e2e3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java @@ -18,57 +18,57 @@ */ package org.apache.pulsar.tests.integration.io; -import static com.google.common.base.Preconditions.checkState; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.util.Map; -import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; -import org.testcontainers.containers.GenericContainer; -public class ElasticSearchSinkTester extends SinkTester { - +public class ElasticSearchSinkTester extends SinkTester<ElasticSearchContainer> { + private RestHighLevelClient elasticClient; public ElasticSearchSinkTester() { - super(SinkType.ELASTIC_SEARCH); + super(ElasticSearchContainer.NAME, SinkType.ELASTIC_SEARCH); - sinkConfig.put("elasticSearchUrl", "http://localhost:9200"); + sinkConfig.put("elasticSearchUrl", "http://" + ElasticSearchContainer.NAME + ":9200"); sinkConfig.put("indexName", "test-index"); } + @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices) { - GenericContainer<?> container = externalServices.get(ElasticSearchContainer.NAME); - checkState(container instanceof ElasticSearchContainer, - "No ElasticSearch service found in the cluster"); + protected ElasticSearchContainer createSinkService(PulsarCluster cluster) { + return new ElasticSearchContainer(cluster.getClusterName()); } @Override public void prepareSink() throws Exception { - RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http")); + RestClientBuilder builder = RestClient.builder( + new HttpHost( + "localhost", + serviceContainer.getMappedPort(9200), + "http")); elasticClient = new RestHighLevelClient(builder); } @Override public void validateSinkResult(Map<String, String> kvs) { - SearchRequest searchRequest = new SearchRequest("test-index"); searchRequest.types("doc"); try { - Header headers = null; - SearchResponse searchResult = elasticClient.search(searchRequest, headers); - assertTrue(searchResult.getHits().getTotalHits() > 0); + SearchResponse searchResult = elasticClient.search(searchRequest); + assertTrue(searchResult.getHits().getTotalHits() > 0, searchResult.toString()); } catch (Exception e) { - e.printStackTrace(); + fail("Encountered exception on validating elastic search results", e); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java index 46c5f242df..957b93acbd 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java @@ -21,18 +21,14 @@ import java.util.Map; import org.apache.pulsar.tests.integration.containers.HdfsContainer; -import org.testcontainers.containers.GenericContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; -import static com.google.common.base.Preconditions.checkState; - -public class HdfsSinkTester extends SinkTester { +public class HdfsSinkTester extends SinkTester<HdfsContainer> { private static final String NAME = "HDFS"; - private HdfsContainer hdfsCluster; - public HdfsSinkTester() { - super(SinkType.HDFS); + super(NAME, SinkType.HDFS); // TODO How do I get the core-site.xml, and hdfs-site.xml files from the container? sinkConfig.put("hdfsConfigResources", ""); @@ -40,20 +36,18 @@ public HdfsSinkTester() { } @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof HdfsContainer, "No HDFS service found in the cluster"); - this.hdfsCluster = (HdfsContainer) container; + protected HdfsContainer createSinkService(PulsarCluster cluster) { + return new HdfsContainer(cluster.getClusterName()); } @Override public void prepareSink() throws Exception { // Create the test directory - hdfsCluster.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", "/tmp/testing"); - hdfsCluster.execInContainer("/hadoop/bin/hdfs", "-chown", "tester:testing", "/tmp/testing"); + serviceContainer.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", "/tmp/testing"); + serviceContainer.execInContainer("/hadoop/bin/hdfs", "-chown", "tester:testing", "/tmp/testing"); // Execute all future commands as the "tester" user - hdfsCluster.execInContainer("export HADOOP_USER_NAME=tester"); + serviceContainer.execInContainer("export HADOOP_USER_NAME=tester"); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java index 7c14ba96c0..72d9b0152c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java @@ -18,22 +18,23 @@ */ package org.apache.pulsar.tests.integration.io; -import static com.google.common.base.Preconditions.checkState; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; +import com.github.dockerjava.api.command.CreateContainerCmd; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Map; +import java.util.function.Consumer; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.testcontainers.containers.GenericContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.containers.MySQLContainer; /** @@ -41,7 +42,7 @@ * This will use MySql as DB server */ @Slf4j -public class JdbcSinkTester extends SinkTester { +public class JdbcSinkTester extends SinkTester<MySQLContainer> { /** * A Simple class to test jdbc class, @@ -57,14 +58,14 @@ } private static final String NAME = "jdbc"; + private static final String MYSQL = "mysql"; - private MySQLContainer mySQLContainer; private AvroSchema<Foo> schema = AvroSchema.of(Foo.class); private String tableName = "test"; private Connection connection; public JdbcSinkTester() { - super(SinkType.JDBC); + super(NAME, SinkType.JDBC); // container default value is test sinkConfig.put("userName", "test"); @@ -79,21 +80,28 @@ public JdbcSinkTester() { } @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get("mysql"); - checkState(container instanceof MySQLContainer, - "No MySQL service found in the cluster"); - - this.mySQLContainer = (MySQLContainer) container; - log.info("find sink service container: {}", mySQLContainer.getContainerName()); + protected MySQLContainer createSinkService(PulsarCluster cluster) { + return (MySQLContainer) new MySQLContainer() + .withUsername("test") + .withPassword("test") + .withDatabaseName("test") + .withNetworkAliases(MYSQL) + .withCreateContainerCmdModifier(new Consumer<CreateContainerCmd>() { + @Override + public void accept(CreateContainerCmd createContainerCmd) { + createContainerCmd + .withName(MYSQL) + .withHostName(cluster.getClusterName() + "-" + MYSQL); + } + }); } @Override public void prepareSink() throws Exception { - String jdbcUrl = mySQLContainer.getJdbcUrl(); + String jdbcUrl = serviceContainer.getJdbcUrl(); // we need set mysql server address in cluster network. - sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test"); - String driver = mySQLContainer.getDriverClassName(); + sinkConfig.put("jdbcUrl", "jdbc:mysql://" + MYSQL + ":3306/test"); + String driver = serviceContainer.getDriverClassName(); Class.forName(driver); connection = DriverManager.getConnection(jdbcUrl, "test", "test"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java index ff79e1a302..6713cc181f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.tests.integration.io; -import static com.google.common.base.Preconditions.checkState; import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -32,8 +31,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.containers.Container.ExecResult; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -41,18 +40,15 @@ * A tester for testing kafka sink. */ @Slf4j -public class KafkaSinkTester extends SinkTester { +public class KafkaSinkTester extends SinkTester<KafkaContainer> { private static final String NAME = "kafka"; private final String kafkaTopicName; - - private KafkaContainer kafkaContainer; - private KafkaConsumer<String, String> kafkaConsumer; public KafkaSinkTester() { - super(SinkType.KAFKA); + super(NAME, SinkType.KAFKA); String suffix = randomName(8) + "_" + System.currentTimeMillis(); this.kafkaTopicName = "kafka_sink_topic_" + suffix; @@ -64,17 +60,19 @@ public KafkaSinkTester() { } @Override - public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof KafkaContainer, - "No kafka service found in the cluster"); - - this.kafkaContainer = (KafkaContainer) container; + protected KafkaContainer createSinkService(PulsarCluster cluster) { + final String kafkaServiceName = NAME; + return new KafkaContainer() + .withEmbeddedZookeeper() + .withNetworkAliases(kafkaServiceName) + .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd + .withName(kafkaServiceName) + .withHostName(cluster.getClusterName() + "-" + kafkaServiceName)); } @Override public void prepareSink() throws Exception { - ExecResult execResult = kafkaContainer.execInContainer( + ExecResult execResult = serviceContainer.execInContainer( "/usr/bin/kafka-topics", "--create", "--zookeeper", @@ -91,7 +89,7 @@ public void prepareSink() throws Exception { kafkaConsumer = new KafkaConsumer<>( ImmutableMap.of( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceContainer.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ), diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java index 4928f00cfb..cee17b1451 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.tests.integration.io; -import static com.google.common.base.Preconditions.checkState; import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName; import static org.testng.Assert.assertTrue; @@ -35,7 +34,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.testcontainers.containers.Container.ExecResult; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -43,7 +41,7 @@ * A tester for testing kafka source. */ @Slf4j -public class KafkaSourceTester extends SourceTester { +public class KafkaSourceTester extends SourceTester<KafkaContainer> { private static final String NAME = "kafka"; @@ -68,12 +66,8 @@ public KafkaSourceTester() { } @Override - public void findSourceServiceContainer(Map<String, GenericContainer<?>> containers) { - GenericContainer<?> container = containers.get(NAME); - checkState(container instanceof KafkaContainer, - "No kafka service found in the cluster"); - - this.kafkaContainer = (KafkaContainer) container; + public void setServiceContainer(KafkaContainer container) { + this.kafkaContainer = container; } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java index d2917e6a3e..2dd0759de2 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java @@ -21,6 +21,7 @@ import java.util.Map; import lombok.Getter; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.containers.GenericContainer; import org.testng.collections.Maps; @@ -28,7 +29,7 @@ * A tester used for testing a specific sink. */ @Getter -public abstract class SinkTester { +public abstract class SinkTester<ServiceContainerT extends GenericContainer> { public enum SinkType { UNDEFINED, @@ -39,19 +40,23 @@ ELASTIC_SEARCH } + protected final String networkAlias; protected final SinkType sinkType; protected final String sinkArchive; protected final String sinkClassName; protected final Map<String, Object> sinkConfig; + protected ServiceContainerT serviceContainer; - public SinkTester(SinkType sinkType) { + public SinkTester(String networkAlias, SinkType sinkType) { + this.networkAlias = networkAlias; this.sinkType = sinkType; this.sinkArchive = null; this.sinkClassName = null; this.sinkConfig = Maps.newHashMap(); } - public SinkTester(String sinkArchive, String sinkClassName) { + public SinkTester(String networkAlias, String sinkArchive, String sinkClassName) { + this.networkAlias = networkAlias; this.sinkType = SinkType.UNDEFINED; this.sinkArchive = sinkArchive; this.sinkClassName = sinkClassName; @@ -62,7 +67,19 @@ public SinkTester(String sinkArchive, String sinkClassName) { return Schema.STRING; } - public abstract void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices); + protected abstract ServiceContainerT createSinkService(PulsarCluster cluster); + + public ServiceContainerT startServiceContainer(PulsarCluster cluster) { + this.serviceContainer = createSinkService(cluster); + cluster.startService(networkAlias, serviceContainer); + return serviceContainer; + } + + public void stopServiceContainer(PulsarCluster cluster) { + if (null != serviceContainer) { + cluster.stopService(networkAlias, serviceContainer); + } + } public SinkType sinkType() { return sinkType; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java index dc58f2ffbb..f1feb70eef 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java @@ -27,7 +27,7 @@ * A tester used for testing a specific source. */ @Getter -public abstract class SourceTester { +public abstract class SourceTester<ServiceContainerT extends GenericContainer> { protected final String sourceType; protected final Map<String, Object> sourceConfig; @@ -37,7 +37,7 @@ protected SourceTester(String sourceType) { this.sourceConfig = Maps.newHashMap(); } - public abstract void findSourceServiceContainer(Map<String, GenericContainer<?>> externalServices); + public abstract void setServiceContainer(ServiceContainerT serviceContainer); public String sourceType() { return sourceType; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java index b4a6b83f58..20b9da0373 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java @@ -18,19 +18,10 @@ */ package org.apache.pulsar.tests.integration.suites; -import java.util.Map; -import org.apache.pulsar.tests.integration.containers.CassandraContainer; -import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer; -import org.apache.pulsar.tests.integration.containers.HdfsContainer; -import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder; import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.MySQLContainer; import org.testng.ITest; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; -import org.testng.collections.Maps; public class PulsarTestSuite extends PulsarClusterTestBase implements ITest { @@ -46,47 +37,6 @@ public void tearDownCluster() { super.tearDownCluster(); } - @Override - protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpecBuilder specBuilder) { - PulsarClusterSpecBuilder builder = super.beforeSetupCluster(clusterName, specBuilder); - - // start functions - - // register external services - Map<String, GenericContainer<?>> externalServices = Maps.newHashMap(); - - final String kafkaServiceName = "kafka"; - externalServices.put( - kafkaServiceName, - new KafkaContainer() - .withEmbeddedZookeeper() - .withNetworkAliases(kafkaServiceName) - .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd - .withName(kafkaServiceName) - .withHostName(clusterName + "-" + kafkaServiceName))); - - final String cassandraServiceName = "cassandra"; - externalServices.put( - cassandraServiceName, - new CassandraContainer(clusterName)); - - // use mySQL for jdbc test - final String jdbcServiceName = "mysql"; - externalServices.put( - jdbcServiceName, - new MySQLContainer() - .withExposedPorts(3306)); - - externalServices.put( - ElasticSearchContainer.NAME, - new ElasticSearchContainer(ElasticSearchContainer.NAME) - .withExposedPorts(9200)); - - builder = builder.externalServices(externalServices); - - return builder; - } - @Override public String getTestName() { return "pulsar-test-suite"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index f78097b65c..af6071220d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -209,6 +209,23 @@ public void start() throws Exception { } } + public void startService(String networkAlias, + GenericContainer<?> serviceContainer) { + log.info("Starting external service {} ...", networkAlias); + serviceContainer.withNetwork(network); + serviceContainer.withNetworkAliases(networkAlias); + serviceContainer.start(); + log.info("Successfully start external service {}", networkAlias); + } + + public void stopService(String networkAlias, + GenericContainer<?> serviceContainer) { + log.info("Stopping external service {} ...", networkAlias); + serviceContainer.stop(); + log.info("Successfully stop external service {}", networkAlias); + } + + private static <T extends PulsarContainer> Map<String, T> runNumContainers(String serviceName, int numContainers, Function<String, T> containerCreator) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services