This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7530d64  [tests] improve connector related integration tests (#2587)
7530d64 is described below

commit 7530d64a679a0783122b18058c1148c89c0fee0a
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Tue Sep 18 14:50:51 2018 -0700

    [tests] improve connector related integration tests (#2587)
    
    *Motivation*
    
    with more and more connector are added, it becomes expensive to start all 
external services at the begin.
    
    *Changes*
    
    - refactor the connector testing framework to start external service before 
methods
    - fix kafka, cassandra and mysql connectors
---
 distribution/io/src/assemble/io.xml                |   6 +
 .../broker/service/PersistentQueueE2ETest.java     |  60 ----------
 .../functions/instance/JavaInstanceRunnable.java   |   3 +-
 .../pulsar/functions/source/TopicSchema.java       |   3 +
 ...rchAbstractSink.java => ElasticSearchSink.java} |   9 +-
 .../io/elasticsearch/ElasticSearchStringSink.java  |  35 ------
 .../resources/META-INF/services/pulsar-io.yaml     |   4 +-
 .../io/elasticsearch/ElasticSearchSinkTests.java   |   4 +-
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  |   6 +-
 .../{KafkaStringSink.java => KafkaBytesSink.java}  |  25 ++++-
 .../resources/META-INF/services/pulsar-io.yaml     |   2 +-
 site2/docs/io-quickstart.md                        |   2 +-
 .../version-2.1.0-incubating/io-quickstart.md      |   2 +-
 .../integration/functions/PulsarFunctionsTest.java |  39 +++++--
 .../functions/utils/CommandGenerator.java          |   2 +-
 .../integration/io/CassandraSinkArchiveTester.java | 121 ---------------------
 .../tests/integration/io/CassandraSinkTester.java  |  45 +++++---
 .../integration/io/ElasticSearchSinkTester.java    |  34 +++---
 .../tests/integration/io/HdfsSinkTester.java       |  22 ++--
 .../tests/integration/io/JdbcSinkTester.java       |  38 ++++---
 .../tests/integration/io/KafkaSinkTester.java      |  28 +++--
 .../tests/integration/io/KafkaSourceTester.java    |  12 +-
 .../pulsar/tests/integration/io/SinkTester.java    |  25 ++++-
 .../pulsar/tests/integration/io/SourceTester.java  |   4 +-
 .../tests/integration/suites/PulsarTestSuite.java  |  50 ---------
 .../integration/topologies/PulsarCluster.java      |  17 +++
 26 files changed, 212 insertions(+), 386 deletions(-)

diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index 08ff859..a509e19 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -92,5 +92,11 @@
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+
+    <file>
+      
<source>${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar</source>
+      <outputDirectory>connectors</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
   </files>
 </assembly>
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 2937ca0..fb7a76c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -233,66 +233,6 @@ public class PersistentQueueE2ETest extends BrokerTestBase 
{
         deleteTopic(topicName);
     }
 
-    @Test
-    public void testConsumersWithDifferentPermits() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/shared-topic4";
-        final String subName = "sub4";
-        final int numMsgs = 10000;
-
-        final AtomicInteger msgCountConsumer1 = new AtomicInteger(0);
-        final AtomicInteger msgCountConsumer2 = new AtomicInteger(0);
-        final CountDownLatch latch = new CountDownLatch(numMsgs);
-
-        int recvQ1 = 10;
-        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-                
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ1)
-                .messageListener((consumer, msg) -> {
-                    msgCountConsumer1.incrementAndGet();
-                    try {
-                        consumer.acknowledge(msg);
-                        latch.countDown();
-                    } catch (PulsarClientException e) {
-                        fail("Should not fail");
-                    }
-                }).subscribe();
-
-        int recvQ2 = 1;
-        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-                
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ2)
-                .messageListener((consumer, msg) -> {
-                    msgCountConsumer2.incrementAndGet();
-                    try {
-                        consumer.acknowledge(msg);
-                        latch.countDown();
-                    } catch (PulsarClientException e) {
-                        fail("Should not fail");
-                    }
-                }).subscribe();
-
-        List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            .maxPendingMessages(numMsgs + 1)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
-        for (int i = 0; i < numMsgs; i++) {
-            String message = "msg-" + i;
-            futures.add(producer.sendAsync(message.getBytes()));
-        }
-        FutureUtil.waitForAll(futures).get();
-        producer.close();
-
-        latch.await(5, TimeUnit.SECONDS);
-
-        assertEquals(msgCountConsumer1.get(), numMsgs - numMsgs / (recvQ1 + 
recvQ2), numMsgs * 0.1);
-        assertEquals(msgCountConsumer2.get(), numMsgs / (recvQ1 + recvQ2), 
numMsgs * 0.1);
-
-        consumer1.close();
-        consumer2.close();
-
-        deleteTopic(topicName);
-    }
-
     // this test is good to have to see the distribution, but every now and 
then it gets slightly different than the
     // expected numbers. keeping this disabled to not break the build, but 
nevertheless this gives good insight into
     // how the round robin distribution algorithm is behaving
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4ba7340..b3f86ea 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -146,7 +146,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         ThreadContext.put("functionname", 
instanceConfig.getFunctionDetails().getName());
         ThreadContext.put("instance", instanceConfig.getInstanceId());
 
-        log.info("Starting Java Instance {}", 
instanceConfig.getFunctionDetails().getName());
+        log.info("Starting Java Instance {} : \n Details = {}",
+            instanceConfig.getFunctionDetails().getName(), 
instanceConfig.getFunctionDetails());
 
         // start the function thread
         loadJars();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 76375dc..db9e880 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -78,6 +78,9 @@ public class TopicSchema {
     private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
         if (GenericRecord.class.isAssignableFrom(clazz)) {
             return SchemaType.AUTO;
+        } else if (byte[].class.equals(clazz)) {
+            // if function uses bytes, we should ignore
+            return SchemaType.NONE;
         } else {
             Optional<SchemaInfo> schema = ((PulsarClientImpl) 
client).getSchema(topic).join();
             if (schema.isPresent()) {
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
similarity index 94%
rename from 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java
rename to 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 3760d40..86546f3 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -51,7 +51,7 @@ import org.elasticsearch.common.xcontent.XContentType;
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
-public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> {
+public class ElasticSearchSink implements Sink<byte[]> {
 
     protected static final String DOCUMENT = "doc";
 
@@ -74,7 +74,7 @@ public abstract class ElasticSearchAbstractSink<K, V> 
implements Sink<byte[]> {
 
     @Override
     public void write(Record<byte[]> record) {
-        KeyValue<K, V> keyValue = extractKeyValue(record);
+        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
         IndexRequest indexRequest = 
Requests.indexRequest(elasticSearchConfig.getIndexName());
         indexRequest.type(DOCUMENT);
         indexRequest.source(keyValue.getValue(), XContentType.JSON);
@@ -91,7 +91,10 @@ public abstract class ElasticSearchAbstractSink<K, V> 
implements Sink<byte[]> {
         }
     }
 
-    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record);
+    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
+        String key = record.getKey().orElseGet(null);
+        return new KeyValue<>(key, record.getValue());
+    }
 
     private void createIndexIfNeeded() throws IOException {
         GetIndexRequest request = new GetIndexRequest();
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java
deleted file mode 100644
index 6cfa03d..0000000
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.elasticsearch;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-
-/**
- * Concrete ElasticSearch sink.
- * This class assumes that the input will be JSON documents
- */
-public class ElasticSearchStringSink extends ElasticSearchAbstractSink<String, 
String> {
-
-    @Override
-    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElseGet(() -> new 
String(record.getValue()));
-        return new KeyValue<>(key, new String(record.getValue()));
-    }
-}
diff --git 
a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
index 0307516..97789e9 100644
--- 
a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
+++ 
b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,6 +17,6 @@
 # under the License.
 #
 
-name: Elastic Search
+name: elastic_search
 description: Writes data into Elastic Search
-sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchStringSink
+sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index ea2b886..f188829 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -56,7 +56,7 @@ public class ElasticSearchSinkTests {
     @Mock
     protected SinkContext mockSinkContext;   
     protected Map<String, Object> map;
-    protected ElasticSearchStringSink sink;
+    protected ElasticSearchSink sink;
     
     @BeforeClass
     public static final void init() {
@@ -71,7 +71,7 @@ public class ElasticSearchSinkTests {
     public final void setUp() throws Exception {
         map = new HashMap<String, Object> ();
         map.put("elasticSearchUrl", "http://localhost:9200";);
-        sink = new ElasticSearchStringSink();
+        sink = new ElasticSearchSink();
         
         mockRecord = mock(Record.class);
         mockSinkContext = mock(SinkContext.class);
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index a92a368..50ce4b9 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -70,6 +70,10 @@ public abstract class KafkaAbstractSink<K, V> implements 
Sink<byte[]> {
         }
     }
 
+    protected Properties beforeCreateProducer(Properties props) {
+        return props;
+    }
+
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         kafkaSinkConfig = KafkaSinkConfig.load(config);
@@ -89,7 +93,7 @@ public abstract class KafkaAbstractSink<K, V> implements 
Sink<byte[]> {
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getKeySerializerClass());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getValueSerializerClass());
 
-        producer = new KafkaProducer<>(props);
+        producer = new KafkaProducer<>(beforeCreateProducer(props));
 
         log.info("Kafka sink started.");
     }
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
similarity index 50%
rename from 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
rename to 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
index 89e3e7f..9ce2bdc 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
@@ -19,16 +19,31 @@
 
 package org.apache.pulsar.io.kafka;
 
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 
 /**
- * Kafka sink that treats incoming messages on the input topic as Strings
- * and write identical key/value pairs.
+ * Kafka sink should treats incoming messages as pure bytes. So we don't
+ * apply schema into it.
  */
-public class KafkaStringSink extends KafkaAbstractSink<String, String> {
+@Slf4j
+public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> {
+
+    @Override
+    protected Properties beforeCreateProducer(Properties props) {
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        log.info("Created kafka producer config : {}", props);
+        return props;
+    }
+
     @Override
-    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
-        return new KeyValue<>(record.getKey().orElse(null), new 
String(record.getValue()));
+    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
+        return new KeyValue<>(record.getKey().orElse(null), record.getValue());
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
index a7bc813..7afc154 100644
--- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,4 +20,4 @@
 name: kafka
 description: Kafka source and sink connector
 sourceClass: org.apache.pulsar.io.kafka.KafkaStringSource
-sinkClass: org.apache.pulsar.io.kafka.KafkaStringSink
+sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink
diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md
index 8b8cfd3..4a48bc0 100644
--- a/site2/docs/io-quickstart.md
+++ b/site2/docs/io-quickstart.md
@@ -123,7 +123,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors
 
 Example output:
 ```json
-[{"name":"aerospike","description":"Aerospike database 
sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
 data into 
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
 source and sink 
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis
 sink connect [...]
+[{"name":"aerospike","description":"Aerospike database 
sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
 data into 
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
 source and sink 
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis
 sink connecto [...]
 ```
 
 If an error occurred while starting Pulsar service, you may be able to seen 
exception at the terminal you are running `pulsar/standalone`,
diff --git 
a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md 
b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md
index 7b21379..afa8e31 100644
--- a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md
+++ b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md
@@ -124,7 +124,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors
 
 Example output:
 ```json
-[{"name":"aerospike","description":"Aerospike database 
sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
 data into 
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
 source and sink 
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis
 sink connect [...]
+[{"name":"aerospike","description":"Aerospike database 
sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
 data into 
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
 source and sink 
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis
 sink connecto [...]
 ```
 
 If an error occurred while starting Pulsar service, you may be able to seen 
exception at the terminal you are running `pulsar/standalone`,
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5f11eef..e9ea618 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.tests.integration.io.*;
 import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.Test;
 
 /**
@@ -62,17 +63,17 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
 
     @Test
     public void testKafkaSink() throws Exception {
-        testSink(new KafkaSinkTester(), true);
+        testSink(new KafkaSinkTester(), true, new KafkaSourceTester());
     }
 
     @Test
     public void testCassandraSink() throws Exception {
-        testSink(new CassandraSinkTester(), true);
+        testSink(CassandraSinkTester.createTester(true), true);
     }
 
     @Test
     public void testCassandraArchiveSink() throws Exception {
-        testSink(new CassandraSinkArchiveTester(), false);
+        testSink(CassandraSinkTester.createTester(false), false);
     }
     
     @Test(enabled = false)
@@ -91,8 +92,31 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
     }
     
     private void testSink(SinkTester tester, boolean builtin) throws Exception 
{
-        tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
+        tester.startServiceContainer(pulsarCluster);
+        try {
+            runSinkTester(tester, builtin);
+        } finally {
+            tester.stopServiceContainer(pulsarCluster);
+        }
+    }
+
 
+    private <ServiceContainerT extends GenericContainer>  void 
testSink(SinkTester<ServiceContainerT> sinkTester,
+                                                                        
boolean builtinSink,
+                                                                        
SourceTester<ServiceContainerT> sourceTester)
+            throws Exception {
+        ServiceContainerT serviceContainer = 
sinkTester.startServiceContainer(pulsarCluster);
+        try {
+            runSinkTester(sinkTester, builtinSink);
+            if (null != sourceTester) {
+                sourceTester.setServiceContainer(serviceContainer);
+                testSource(sourceTester);
+            }
+        } finally {
+            sinkTester.stopServiceContainer(pulsarCluster);
+        }
+    }
+    private void runSinkTester(SinkTester tester, boolean builtin) throws 
Exception {
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String inputTopicName = "test-sink-connector-"
@@ -357,14 +381,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
     // Source Test
     //
 
-    @Test
-    public void testKafkaSource() throws Exception {
-        testSource(new KafkaSourceTester());
-    }
-
     private void testSource(SourceTester tester)  throws Exception {
-        tester.findSourceServiceContainer(pulsarCluster.getExternalServices());
-
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "test-source-connector-"
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 6578d08..6f4d012 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -161,7 +161,7 @@ public class CommandGenerator {
     }
 
     public String generateUpdateFunctionCommand(String codeFile) {
-        StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m 
");
+        StringBuilder commandBuilder = new StringBuilder();
         if (adminUrl == null) {
             commandBuilder.append("/pulsar/bin/pulsar-admin functions update");
         } else {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
deleted file mode 100644
index 86c7689..0000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.tests.integration.containers.CassandraContainer;
-import org.testcontainers.containers.GenericContainer;
-
-import java.util.List;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkState;
-import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * A tester for testing cassandra sink submitted as an archive.
- */
-@Slf4j
-public class CassandraSinkArchiveTester extends SinkTester {
-
-    private static final String NAME = "cassandra";
-
-    private static final String ROOTS = "cassandra";
-    private static final String KEY = "key";
-    private static final String COLUMN = "col";
-
-    private final String keySpace;
-    private final String tableName;
-
-    private CassandraContainer cassandraCluster;
-
-    private Cluster cluster;
-    private Session session;
-
-    public CassandraSinkArchiveTester() {
-        
super("/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", 
"org.apache.pulsar.io.cassandra.CassandraStringSink");
-
-        String suffix = randomName(8) + "_" + System.currentTimeMillis();
-        this.keySpace = "keySpace_" + suffix;
-        this.tableName = "tableName_" + suffix;
-
-        sinkConfig.put("roots", ROOTS);
-        sinkConfig.put("keyspace", keySpace);
-        sinkConfig.put("columnFamily", tableName);
-        sinkConfig.put("keyname", KEY);
-        sinkConfig.put("columnName", COLUMN);
-    }
-
-    @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get(NAME);
-        checkState(container instanceof CassandraContainer,
-            "No kafka service found in the cluster");
-
-        this.cassandraCluster = (CassandraContainer) container;
-    }
-
-    @Override
-    public void prepareSink() {
-        // build the sink
-        cluster = Cluster.builder()
-            .addContactPoint("localhost")
-            .withPort(cassandraCluster.getCassandraPort())
-            .build();
-
-        // connect to the cluster
-        session = cluster.connect();
-        log.info("Connecting to cassandra cluster at localhost:{}", 
cassandraCluster.getCassandraPort());
-
-        String createKeySpace =
-            "CREATE KEYSPACE " + keySpace
-                + " WITH replication = {'class':'SimpleStrategy', 
'replication_factor':1}; ";
-        log.info(createKeySpace);
-        session.execute(createKeySpace);
-        session.execute("USE " + keySpace);
-
-        String createTable = "CREATE TABLE " + tableName
-            + "(" + KEY + " text PRIMARY KEY, "
-            + COLUMN + " text);";
-        log.info(createTable);
-        session.execute(createTable);
-    }
-
-    @Override
-    public void validateSinkResult(Map<String, String> kvs) {
-        String query = "SELECT * FROM " + tableName + ";";
-        ResultSet result = session.execute(query);
-        List<Row> rows = result.all();
-        assertEquals(kvs.size(), rows.size());
-        for (Row row : rows) {
-            String key = row.getString(KEY);
-            String value = row.getString(COLUMN);
-
-            String expectedValue = kvs.get(key);
-            assertNotNull(expectedValue);
-            assertEquals(expectedValue, value);
-        }
-    }
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index c9d3e5a..3309358 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -24,12 +24,11 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.CassandraContainer;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
 import java.util.List;
 import java.util.Map;
 
-import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -38,7 +37,15 @@ import static org.junit.Assert.assertNotNull;
  * A tester for testing cassandra sink.
  */
 @Slf4j
-public class CassandraSinkTester extends SinkTester {
+public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+
+    public static CassandraSinkTester createTester(boolean builtin) {
+        if (builtin) {
+            return new CassandraSinkTester(builtin);
+        } else {
+            return new CassandraSinkTester();
+        }
+    }
 
     private static final String NAME = "cassandra";
 
@@ -49,13 +56,11 @@ public class CassandraSinkTester extends SinkTester {
     private final String keySpace;
     private final String tableName;
 
-    private CassandraContainer cassandraCluster;
-
     private Cluster cluster;
     private Session session;
 
-    public CassandraSinkTester() {
-        super(SinkType.CASSANDRA);
+    private CassandraSinkTester() {
+        super(NAME, 
"/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", 
"org.apache.pulsar.io.cassandra.CassandraStringSink");
 
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.keySpace = "keySpace_" + suffix;
@@ -68,13 +73,23 @@ public class CassandraSinkTester extends SinkTester {
         sinkConfig.put("columnName", COLUMN);
     }
 
-    @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get(NAME);
-        checkState(container instanceof CassandraContainer,
-            "No kafka service found in the cluster");
+    private CassandraSinkTester(boolean builtin) {
+        super(NAME, SinkType.CASSANDRA);
+
+        String suffix = randomName(8) + "_" + System.currentTimeMillis();
+        this.keySpace = "keySpace_" + suffix;
+        this.tableName = "tableName_" + suffix;
 
-        this.cassandraCluster = (CassandraContainer) container;
+        sinkConfig.put("roots", ROOTS);
+        sinkConfig.put("keyspace", keySpace);
+        sinkConfig.put("columnFamily", tableName);
+        sinkConfig.put("keyname", KEY);
+        sinkConfig.put("columnName", COLUMN);
+    }
+
+    @Override
+    protected CassandraContainer createSinkService(PulsarCluster cluster) {
+        return new CassandraContainer(cluster.getClusterName());
     }
 
     @Override
@@ -82,12 +97,12 @@ public class CassandraSinkTester extends SinkTester {
         // build the sink
         cluster = Cluster.builder()
             .addContactPoint("localhost")
-            .withPort(cassandraCluster.getCassandraPort())
+            .withPort(serviceContainer.getCassandraPort())
             .build();
 
         // connect to the cluster
         session = cluster.connect();
-        log.info("Connecting to cassandra cluster at localhost:{}", 
cassandraCluster.getCassandraPort());
+        log.info("Connecting to cassandra cluster at localhost:{}", 
serviceContainer.getCassandraPort());
 
         String createKeySpace =
             "CREATE KEYSPACE " + keySpace
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
index 0effc8e..eee208e 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
@@ -18,57 +18,57 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import java.util.Map;
 
-import org.apache.http.Header;
 import org.apache.http.HttpHost;
 import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
-import org.testcontainers.containers.GenericContainer;
 
-public class ElasticSearchSinkTester extends SinkTester {
-    
+public class ElasticSearchSinkTester extends 
SinkTester<ElasticSearchContainer> {
+
     private RestHighLevelClient elasticClient;
 
     public ElasticSearchSinkTester() {
-        super(SinkType.ELASTIC_SEARCH);
+        super(ElasticSearchContainer.NAME, SinkType.ELASTIC_SEARCH);
         
-        sinkConfig.put("elasticSearchUrl", "http://localhost:9200";);
+        sinkConfig.put("elasticSearchUrl", "http://"; + 
ElasticSearchContainer.NAME + ":9200");
         sinkConfig.put("indexName", "test-index");
     }
 
+
     @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
externalServices) {
-        GenericContainer<?> container = 
externalServices.get(ElasticSearchContainer.NAME);
-        checkState(container instanceof ElasticSearchContainer,
-            "No ElasticSearch service found in the cluster");
+    protected ElasticSearchContainer createSinkService(PulsarCluster cluster) {
+        return new ElasticSearchContainer(cluster.getClusterName());
     }
 
     @Override
     public void prepareSink() throws Exception {
-        RestClientBuilder builder = RestClient.builder(new 
HttpHost("localhost", 9200, "http"));
+        RestClientBuilder builder = RestClient.builder(
+            new HttpHost(
+                "localhost",
+                serviceContainer.getMappedPort(9200),
+                "http"));
         elasticClient = new RestHighLevelClient(builder);
     }
 
     @Override
     public void validateSinkResult(Map<String, String> kvs) {
-        
         SearchRequest searchRequest = new SearchRequest("test-index");
         searchRequest.types("doc");
         
         try {
-            Header headers = null;
-            SearchResponse searchResult = elasticClient.search(searchRequest, 
headers);
-            assertTrue(searchResult.getHits().getTotalHits() > 0);
+            SearchResponse searchResult = elasticClient.search(searchRequest);
+            assertTrue(searchResult.getHits().getTotalHits() > 0, 
searchResult.toString());
         } catch (Exception e) {
-            e.printStackTrace();
+            fail("Encountered exception on validating elastic search results", 
e);
         }
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
index 46c5f24..957b93a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
@@ -21,18 +21,14 @@ package org.apache.pulsar.tests.integration.io;
 import java.util.Map;
 
 import org.apache.pulsar.tests.integration.containers.HdfsContainer;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
-import static com.google.common.base.Preconditions.checkState;
-
-public class HdfsSinkTester extends SinkTester {
+public class HdfsSinkTester extends SinkTester<HdfsContainer> {
        
        private static final String NAME = "HDFS";
        
-       private HdfsContainer hdfsCluster;
-       
        public HdfsSinkTester() {
-               super(SinkType.HDFS);
+               super(NAME, SinkType.HDFS);
                
                // TODO How do I get the core-site.xml, and hdfs-site.xml files 
from the container?
                sinkConfig.put("hdfsConfigResources", "");
@@ -40,20 +36,18 @@ public class HdfsSinkTester extends SinkTester {
        }
 
        @Override
-       public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-               GenericContainer<?> container = containers.get(NAME);   
-               checkState(container instanceof HdfsContainer, "No HDFS service 
found in the cluster");
-           this.hdfsCluster = (HdfsContainer) container;
+       protected HdfsContainer createSinkService(PulsarCluster cluster) {
+               return new HdfsContainer(cluster.getClusterName());
        }
 
        @Override
        public void prepareSink() throws Exception {
                // Create the test directory
-               hdfsCluster.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", 
"/tmp/testing");
-               hdfsCluster.execInContainer("/hadoop/bin/hdfs", "-chown", 
"tester:testing", "/tmp/testing");
+               serviceContainer.execInContainer("/hadoop/bin/hdfs","dfs", 
"-mkdir", "/tmp/testing");
+               serviceContainer.execInContainer("/hadoop/bin/hdfs", "-chown", 
"tester:testing", "/tmp/testing");
                
                // Execute all future commands as the "tester" user
-               hdfsCluster.execInContainer("export HADOOP_USER_NAME=tester");
+               serviceContainer.execInContainer("export 
HADOOP_USER_NAME=tester");
        }
 
        @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index 7c14ba9..72d9b01 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -18,22 +18,23 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
+import com.github.dockerjava.api.command.CreateContainerCmd;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.Map;
+import java.util.function.Consumer;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.MySQLContainer;
 
 /**
@@ -41,7 +42,7 @@ import org.testcontainers.containers.MySQLContainer;
  * This will use MySql as DB server
  */
 @Slf4j
-public class JdbcSinkTester extends SinkTester {
+public class JdbcSinkTester extends SinkTester<MySQLContainer> {
 
     /**
      * A Simple class to test jdbc class,
@@ -57,14 +58,14 @@ public class JdbcSinkTester extends SinkTester {
     }
 
     private static final String NAME = "jdbc";
+    private static final String MYSQL = "mysql";
 
-    private MySQLContainer mySQLContainer;
     private AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
     private String tableName = "test";
     private Connection connection;
 
     public JdbcSinkTester() {
-        super(SinkType.JDBC);
+        super(NAME, SinkType.JDBC);
 
         // container default value is test
         sinkConfig.put("userName", "test");
@@ -79,21 +80,28 @@ public class JdbcSinkTester extends SinkTester {
     }
 
     @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get("mysql");
-        checkState(container instanceof MySQLContainer,
-            "No MySQL service found in the cluster");
-
-        this.mySQLContainer = (MySQLContainer) container;
-        log.info("find sink service container: {}", 
mySQLContainer.getContainerName());
+    protected MySQLContainer createSinkService(PulsarCluster cluster) {
+        return (MySQLContainer) new MySQLContainer()
+            .withUsername("test")
+            .withPassword("test")
+            .withDatabaseName("test")
+            .withNetworkAliases(MYSQL)
+            .withCreateContainerCmdModifier(new Consumer<CreateContainerCmd>() 
{
+                @Override
+                public void accept(CreateContainerCmd createContainerCmd) {
+                    createContainerCmd
+                        .withName(MYSQL)
+                        .withHostName(cluster.getClusterName() + "-" + MYSQL);
+                }
+            });
     }
 
     @Override
     public void prepareSink() throws Exception {
-        String jdbcUrl = mySQLContainer.getJdbcUrl();
+        String jdbcUrl = serviceContainer.getJdbcUrl();
         // we need set mysql server address in cluster network.
-        sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test");
-        String driver = mySQLContainer.getDriverClassName();
+        sinkConfig.put("jdbcUrl", "jdbc:mysql://" + MYSQL + ":3306/test");
+        String driver = serviceContainer.getDriverClassName();
         Class.forName(driver);
 
         connection = DriverManager.getConnection(jdbcUrl, "test", "test");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index ff79e1a..6713cc1 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
-import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -32,8 +31,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
@@ -41,18 +40,15 @@ import 
org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
  * A tester for testing kafka sink.
  */
 @Slf4j
-public class KafkaSinkTester extends SinkTester {
+public class KafkaSinkTester extends SinkTester<KafkaContainer> {
 
     private static final String NAME = "kafka";
 
     private final String kafkaTopicName;
-
-    private KafkaContainer kafkaContainer;
-
     private KafkaConsumer<String, String> kafkaConsumer;
 
     public KafkaSinkTester() {
-        super(SinkType.KAFKA);
+        super(NAME, SinkType.KAFKA);
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.kafkaTopicName = "kafka_sink_topic_" + suffix;
 
@@ -64,17 +60,19 @@ public class KafkaSinkTester extends SinkTester {
     }
 
     @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get(NAME);
-        checkState(container instanceof KafkaContainer,
-            "No kafka service found in the cluster");
-
-        this.kafkaContainer = (KafkaContainer) container;
+    protected KafkaContainer createSinkService(PulsarCluster cluster) {
+        final String kafkaServiceName = NAME;
+        return new KafkaContainer()
+                .withEmbeddedZookeeper()
+                .withNetworkAliases(kafkaServiceName)
+                .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
+                    .withName(kafkaServiceName)
+                    .withHostName(cluster.getClusterName() + "-" + 
kafkaServiceName));
     }
 
     @Override
     public void prepareSink() throws Exception {
-        ExecResult execResult = kafkaContainer.execInContainer(
+        ExecResult execResult = serviceContainer.execInContainer(
             "/usr/bin/kafka-topics",
             "--create",
             "--zookeeper",
@@ -91,7 +89,7 @@ public class KafkaSinkTester extends SinkTester {
 
         kafkaConsumer = new KafkaConsumer<>(
             ImmutableMap.of(
-                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers(),
+                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
serviceContainer.getBootstrapServers(),
                 ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8),
                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
             ),
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
index 4928f00..cee17b1 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
-import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.testng.Assert.assertTrue;
 
@@ -35,7 +34,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
@@ -43,7 +41,7 @@ import 
org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
  * A tester for testing kafka source.
  */
 @Slf4j
-public class KafkaSourceTester extends SourceTester {
+public class KafkaSourceTester extends SourceTester<KafkaContainer> {
 
     private static final String NAME = "kafka";
 
@@ -68,12 +66,8 @@ public class KafkaSourceTester extends SourceTester {
     }
 
     @Override
-    public void findSourceServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get(NAME);
-        checkState(container instanceof KafkaContainer,
-            "No kafka service found in the cluster");
-
-        this.kafkaContainer = (KafkaContainer) container;
+    public void setServiceContainer(KafkaContainer container) {
+        this.kafkaContainer = container;
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index d2917e6..2dd0759 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.io;
 import java.util.Map;
 import lombok.Getter;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.collections.Maps;
 
@@ -28,7 +29,7 @@ import org.testng.collections.Maps;
  * A tester used for testing a specific sink.
  */
 @Getter
-public abstract class SinkTester {
+public abstract class SinkTester<ServiceContainerT extends GenericContainer> {
 
     public enum SinkType {
         UNDEFINED,
@@ -39,19 +40,23 @@ public abstract class SinkTester {
         ELASTIC_SEARCH
     }
 
+    protected final String networkAlias;
     protected final SinkType sinkType;
     protected final String sinkArchive;
     protected final String sinkClassName;
     protected final Map<String, Object> sinkConfig;
+    protected ServiceContainerT serviceContainer;
 
-    public SinkTester(SinkType sinkType) {
+    public SinkTester(String networkAlias, SinkType sinkType) {
+        this.networkAlias = networkAlias;
         this.sinkType = sinkType;
         this.sinkArchive = null;
         this.sinkClassName = null;
         this.sinkConfig = Maps.newHashMap();
     }
 
-    public SinkTester(String sinkArchive, String sinkClassName) {
+    public SinkTester(String networkAlias, String sinkArchive, String 
sinkClassName) {
+        this.networkAlias = networkAlias;
         this.sinkType = SinkType.UNDEFINED;
         this.sinkArchive = sinkArchive;
         this.sinkClassName = sinkClassName;
@@ -62,7 +67,19 @@ public abstract class SinkTester {
         return Schema.STRING;
     }
 
-    public abstract void findSinkServiceContainer(Map<String, 
GenericContainer<?>> externalServices);
+    protected abstract ServiceContainerT createSinkService(PulsarCluster 
cluster);
+
+    public ServiceContainerT startServiceContainer(PulsarCluster cluster) {
+        this.serviceContainer = createSinkService(cluster);
+        cluster.startService(networkAlias, serviceContainer);
+        return serviceContainer;
+    }
+
+    public void stopServiceContainer(PulsarCluster cluster) {
+        if (null != serviceContainer) {
+            cluster.stopService(networkAlias, serviceContainer);
+        }
+    }
 
     public SinkType sinkType() {
         return sinkType;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
index dc58f2f..f1feb70 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -27,7 +27,7 @@ import org.testng.collections.Maps;
  * A tester used for testing a specific source.
  */
 @Getter
-public abstract class SourceTester {
+public abstract class SourceTester<ServiceContainerT extends GenericContainer> 
{
 
     protected final String sourceType;
     protected final Map<String, Object> sourceConfig;
@@ -37,7 +37,7 @@ public abstract class SourceTester {
         this.sourceConfig = Maps.newHashMap();
     }
 
-    public abstract void findSourceServiceContainer(Map<String, 
GenericContainer<?>> externalServices);
+    public abstract void setServiceContainer(ServiceContainerT 
serviceContainer);
 
     public String sourceType() {
         return sourceType;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index b4a6b83..20b9da0 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -18,19 +18,10 @@
  */
 package org.apache.pulsar.tests.integration.suites;
 
-import java.util.Map;
-import org.apache.pulsar.tests.integration.containers.CassandraContainer;
-import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
-import org.apache.pulsar.tests.integration.containers.HdfsContainer;
-import 
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.containers.MySQLContainer;
 import org.testng.ITest;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
-import org.testng.collections.Maps;
 
 public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
 
@@ -47,47 +38,6 @@ public class PulsarTestSuite extends PulsarClusterTestBase 
implements ITest {
     }
 
     @Override
-    protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, 
PulsarClusterSpecBuilder specBuilder) {
-        PulsarClusterSpecBuilder builder = 
super.beforeSetupCluster(clusterName, specBuilder);
-
-        // start functions
-
-        // register external services
-        Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
-
-        final String kafkaServiceName = "kafka";
-        externalServices.put(
-            kafkaServiceName,
-            new KafkaContainer()
-                .withEmbeddedZookeeper()
-                .withNetworkAliases(kafkaServiceName)
-                .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
-                    .withName(kafkaServiceName)
-                    .withHostName(clusterName + "-" + kafkaServiceName)));
-
-        final String cassandraServiceName = "cassandra";
-        externalServices.put(
-            cassandraServiceName,
-            new CassandraContainer(clusterName));
-
-        // use mySQL for jdbc test
-        final String jdbcServiceName = "mysql";
-        externalServices.put(
-            jdbcServiceName,
-            new MySQLContainer()
-                .withExposedPorts(3306));
-        
-        externalServices.put(
-                ElasticSearchContainer.NAME, 
-                new ElasticSearchContainer(ElasticSearchContainer.NAME)
-                .withExposedPorts(9200));
-
-        builder = builder.externalServices(externalServices);
-
-        return builder;
-    }
-
-    @Override
     public String getTestName() {
         return "pulsar-test-suite";
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index f78097b..af60712 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -209,6 +209,23 @@ public class PulsarCluster {
         }
     }
 
+    public void startService(String networkAlias,
+                             GenericContainer<?> serviceContainer) {
+        log.info("Starting external service {} ...", networkAlias);
+        serviceContainer.withNetwork(network);
+        serviceContainer.withNetworkAliases(networkAlias);
+        serviceContainer.start();
+        log.info("Successfully start external service {}", networkAlias);
+    }
+
+    public void stopService(String networkAlias,
+                            GenericContainer<?> serviceContainer) {
+        log.info("Stopping external service {} ...", networkAlias);
+        serviceContainer.stop();
+        log.info("Successfully stop external service {}", networkAlias);
+    }
+
+
     private static <T extends PulsarContainer> Map<String, T> 
runNumContainers(String serviceName,
                                                                                
int numContainers,
                                                                                
Function<String, T> containerCreator) {

Reply via email to