This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f7022df  Add Presto Sql Test (#2504)
f7022df is described below

commit f7022df6cc5c443cb56dfe133e81a77ff9b7ad72
Author: Ali Ahmed <alahmed...@gmail.com>
AuthorDate: Tue Oct 2 13:49:29 2018 -0700

    Add Presto Sql Test (#2504)
    
    * Add Presto SQL Test
    
    Add Pulsar Presto Container
    
    * Add prest query test
    
    Fix netty dependency
---
 pom.xml                                            |  11 ++
 .../docker-images/latest-version-image/Dockerfile  |   4 +-
 .../{Dockerfile => conf/presto_worker.conf}        |  23 ++--
 .../{Dockerfile => scripts/run-presto-worker.sh}   |  22 ++--
 tests/integration/pom.xml                          |   9 +-
 .../containers/PrestoWorkerContainer.java          |  39 +++++++
 .../pulsar/tests/integration/presto/Stock.java     |  82 +++++++++++++
 .../tests/integration/presto/TestBasicPresto.java  | 127 +++++++++++++++++++++
 .../integration/topologies/PulsarCluster.java      |  76 +++++++++---
 .../integration/topologies/PulsarClusterSpec.java  |   9 ++
 10 files changed, 356 insertions(+), 46 deletions(-)

diff --git a/pom.xml b/pom.xml
index ddc7dee..27a44c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -843,6 +843,11 @@ flexible messaging model and an intuitive client 
API.</description>
         <artifactId>cassandra-driver-core</artifactId>
         <version>${cassandra.version}</version>
       </dependency>
+      <dependency>
+       <groupId>org.assertj</groupId>
+       <artifactId>assertj-core</artifactId>
+       <version>3.11.1</version>
+         </dependency>
     </dependencies>
   </dependencyManagement>
 
@@ -880,6 +885,12 @@ flexible messaging model and an intuitive client 
API.</description>
     </dependency>
 
     <dependency>
+       <groupId>org.assertj</groupId>
+       <artifactId>assertj-core</artifactId>
+       <scope>test</scope>
+        </dependency>
+
+    <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
       <version>1.18.0</version>
diff --git a/tests/docker-images/latest-version-image/Dockerfile 
b/tests/docker-images/latest-version-image/Dockerfile
index 862b53c..491a913 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -25,11 +25,11 @@ RUN mkdir -p /var/log/pulsar && mkdir -p 
/var/run/supervisor/ && mkdir -p /pulsa
 
 COPY conf/supervisord.conf /etc/supervisord.conf
 COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf 
conf/functions_worker.conf \
-     conf/proxy.conf /etc/supervisord/conf.d/
+     conf/proxy.conf conf/presto_worker.conf /etc/supervisord/conf.d/
 
 COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
      ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/
 
 COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
-     scripts/run-bookie.sh scripts/run-broker.sh 
scripts/run-functions-worker.sh scripts/run-proxy.sh \
+     scripts/run-bookie.sh scripts/run-broker.sh 
scripts/run-functions-worker.sh scripts/run-proxy.sh 
scripts/run-presto-worker.sh \
      /pulsar/bin/
diff --git a/tests/docker-images/latest-version-image/Dockerfile 
b/tests/docker-images/latest-version-image/conf/presto_worker.conf
similarity index 52%
copy from tests/docker-images/latest-version-image/Dockerfile
copy to tests/docker-images/latest-version-image/conf/presto_worker.conf
index 862b53c..74172ae 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/conf/presto_worker.conf
@@ -17,19 +17,10 @@
 # under the License.
 #
 
-FROM apachepulsar/pulsar-all:latest
-
-RUN apt-get update && apt-get install -y supervisor
-
-RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p 
/pulsar/ssl
-
-COPY conf/supervisord.conf /etc/supervisord.conf
-COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf 
conf/functions_worker.conf \
-     conf/proxy.conf /etc/supervisord/conf.d/
-
-COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
-     ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/
-
-COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
-     scripts/run-bookie.sh scripts/run-broker.sh 
scripts/run-functions-worker.sh scripts/run-proxy.sh \
-     /pulsar/bin/
+[program:presto-worker]
+autostart=false
+redirect_stderr=true
+stdout_logfile=/var/log/pulsar/presto_worker.log
+directory=/pulsar
+environment=PULSAR_MEM=-Xms128M
+command=/pulsar/bin/pulsar sql-worker start
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/Dockerfile 
b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
old mode 100644
new mode 100755
similarity index 52%
copy from tests/docker-images/latest-version-image/Dockerfile
copy to tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
index 862b53c..a78f955
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,19 +18,12 @@
 # under the License.
 #
 
-FROM apachepulsar/pulsar-all:latest
+bin/apply-config-from-env.py conf/presto/catalog/pulsar.properties && \
+    bin/apply-config-from-env.py conf/pulsar_env.sh
 
-RUN apt-get update && apt-get install -y supervisor
+if [ -z "$NO_AUTOSTART" ]; then
+    sed -i 's/autostart=.*/autostart=true/' 
/etc/supervisord/conf.d/presto_worker.conf
+fi
 
-RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p 
/pulsar/ssl
-
-COPY conf/supervisord.conf /etc/supervisord.conf
-COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf 
conf/functions_worker.conf \
-     conf/proxy.conf /etc/supervisord/conf.d/
-
-COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
-     ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/
-
-COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
-     scripts/run-bookie.sh scripts/run-broker.sh 
scripts/run-functions-worker.sh scripts/run-proxy.sh \
-     /pulsar/bin/
+bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
+exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index f4712c8..fc5ae42 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -77,6 +77,12 @@
     <dependency>
       <groupId>com.datastax.cassandra</groupId>
       <artifactId>cassandra-driver-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-handler</artifactId>
+        </exclusion>
+      </exclusions>
       <scope>test</scope>
     </dependency>
     <dependency>
@@ -116,6 +122,7 @@
       <artifactId>jackson-databind</artifactId>
       <scope>test</scope>
     </dependency>
+
     <dependency>
       <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-yaml</artifactId>
@@ -127,7 +134,7 @@
          <artifactId>elasticsearch-rest-high-level-client</artifactId>
          <version>6.3.2</version>
        </dependency>
-       
+
   </dependencies>
 
   <build>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
new file mode 100644
index 0000000..edb0a0c
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+/**
+ * A pulsar container that runs the presto worker
+ */
+public class PrestoWorkerContainer extends 
PulsarContainer<PrestoWorkerContainer> {
+
+    public static final String NAME = "presto-worker";
+    public static final int PRESTO_HTTP_PORT = 8081;
+
+    public PrestoWorkerContainer(String clusterName, String hostname) {
+        super(
+                clusterName,
+                hostname,
+                hostname,
+                "bin/run-presto-worker.sh",
+                -1,
+                PRESTO_HTTP_PORT,
+                "/v1/node");
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/Stock.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/Stock.java
new file mode 100644
index 0000000..f5d85f1
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/Stock.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.presto;
+
+import java.util.Objects;
+
+public class Stock {
+
+    private int entryId;
+    private String symbol;
+    private double sharePrice;
+
+    public Stock(int entryId, String symbol, double sharePrice) {
+        this.entryId = entryId;
+        this.symbol = symbol;
+        this.sharePrice = sharePrice;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Stock stock = (Stock) o;
+        return entryId == stock.entryId &&
+                Double.compare(stock.sharePrice, sharePrice) == 0 &&
+                Objects.equals(symbol, stock.symbol);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(symbol, sharePrice);
+    }
+
+    @Override
+    public String toString() {
+        return "Stock{" +
+                "entryId=" + entryId +
+                ", symbol='" + symbol + '\'' +
+                ", sharePrice=" + sharePrice +
+                '}';
+    }
+
+    public int getEntryId() {
+        return entryId;
+    }
+
+    public void setEntryId(int entryId) {
+        this.entryId = entryId;
+    }
+
+    public String getSymbol() {
+        return symbol;
+    }
+
+    public void setSymbol(String symbol) {
+        this.symbol = symbol;
+    }
+
+    public double getSharePrice() {
+        return sharePrice;
+    }
+
+    public void setSharePrice(double sharePrice) {
+        this.sharePrice = sharePrice;
+    }
+}
\ No newline at end of file
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
new file mode 100644
index 0000000..0007e3d
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.presto;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.joining;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Slf4j
+public class TestBasicPresto extends PulsarClusterTestBase {
+
+    private static final int NUM_OF_STOCKS = 10;
+
+    @BeforeSuite
+    @Override
+    public void setupCluster() throws Exception {
+        final String clusterName = Stream.of(this.getClass().getSimpleName(), 
randomName(5))
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(1)
+                .enablePrestoWorker(true)
+                .clusterName(clusterName)
+                .build();
+
+        log.info("Setting up cluster {} with {} bookies, {} brokers",
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+
+        log.info("Cluster {} is setup with presto worker", spec.clusterName());
+    }
+
+    @Test
+    public void testDefaultCatalog() throws Exception {
+        ContainerExecResult containerExecResult = execQuery("show catalogs;");
+        assertThat(containerExecResult.getExitCode()).isEqualTo(0);
+        assertThat(containerExecResult.getStdout()).contains("pulsar", 
"system");
+    }
+
+    @Test
+    public void testSimpleSQLQuery() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                                    
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                                    .build();
+
+        final String stocksTopic = "stocks";
+
+        @Cleanup
+        Producer<Stock> producer = 
pulsarClient.newProducer(JSONSchema.of(Stock.class))
+                .topic(stocksTopic)
+                .create();
+
+
+        for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+            final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
+            producer.send(stock);
+        }
+
+        ContainerExecResult containerExecResult = execQuery("select * from 
pulsar.\"public/default\".stocks order by entryid;");
+        assertThat(containerExecResult.getExitCode()).isEqualTo(0);
+        log.info("select sql query output \n{}", 
containerExecResult.getStdout());
+        String[] split = containerExecResult.getStdout().split("\n");
+        assertThat(split.length).isGreaterThan(NUM_OF_STOCKS - 2);
+
+        String[] split2 = containerExecResult.getStdout().split("\n|,");
+
+        for (int i = 0; i < NUM_OF_STOCKS - 2; ++i) {
+            assertThat(split2).contains("\"" + i + "\"");
+            assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
+            assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
+        }
+
+    }
+
+    @AfterSuite
+    @Override
+    public void tearDownCluster() {
+        super.tearDownCluster();
+    }
+
+    public static ContainerExecResult execQuery(final String query) throws 
Exception {
+        ContainerExecResult containerExecResult;
+
+        containerExecResult = pulsarCluster.getPrestoWorkerContainer()
+                .execCmd("/bin/bash", "-c", 
PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
+
+        return containerExecResult;
+
+    }
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index af60712..8ee2f57 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -25,19 +25,21 @@ import static 
org.apache.pulsar.tests.integration.containers.PulsarContainer.ZK_
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Function;
-import java.util.stream.Stream;
+import java.util.stream.Collectors;
 
-import com.google.common.collect.Streams;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.BKContainer;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.containers.CSContainer;
+import org.apache.pulsar.tests.integration.containers.PrestoWorkerContainer;
 import org.apache.pulsar.tests.integration.containers.ProxyContainer;
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
@@ -78,13 +80,29 @@ public class PulsarCluster {
     private final Map<String, BrokerContainer> brokerContainers;
     private final Map<String, WorkerContainer> workerContainers;
     private final ProxyContainer proxyContainer;
+    private final PrestoWorkerContainer prestoWorkerContainer;
     private Map<String, GenericContainer<?>> externalServices = 
Collections.emptyMap();
+    private final boolean enablePrestoWorker;
 
     private PulsarCluster(PulsarClusterSpec spec) {
 
         this.spec = spec;
         this.clusterName = spec.clusterName();
         this.network = Network.newNetwork();
+        this.enablePrestoWorker = spec.enablePrestoWorker();
+
+        if (enablePrestoWorker) {
+            prestoWorkerContainer = new PrestoWorkerContainer(clusterName, 
PrestoWorkerContainer.NAME)
+                    .withNetwork(network)
+                    .withNetworkAliases(PrestoWorkerContainer.NAME)
+                    .withEnv("clusterName", clusterName)
+                    .withEnv("zkServers", ZKContainer.NAME)
+                    .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + 
ZKContainer.ZK_PORT)
+                    .withEnv("pulsar.broker-service-url", 
"http://pulsar-broker-0:8080";);
+        } else {
+            prestoWorkerContainer = null;
+        }
+
 
         this.zkContainer = new ZKContainer(clusterName);
         this.zkContainer
@@ -196,6 +214,11 @@ public class PulsarCluster {
         log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
         log.info("\tHttp Service Url : {}", getHttpServiceUrl());
 
+        if (enablePrestoWorker) {
+            log.info("Starting Presto Worker");
+            prestoWorkerContainer.start();
+        }
+
         // start external services
         this.externalServices = spec.externalServices;
         if (null != externalServices) {
@@ -238,19 +261,32 @@ public class PulsarCluster {
         return containers;
     }
 
+    public PrestoWorkerContainer getPrestoWorkerContainer() {
+        return prestoWorkerContainer;
+    }
+
     public synchronized void stop() {
-        Stream<GenericContainer> containers = Streams.concat(
-                workerContainers.values().stream(),
-                brokerContainers.values().stream(),
-                bookieContainers.values().stream(),
-                Stream.of(proxyContainer, csContainer, zkContainer)
-        );
 
-        if (spec.externalServices() != null) {
-            containers = Streams.concat(containers, 
spec.externalServices().values().stream());
+        List<GenericContainer> containers = new ArrayList<>();
+
+        containers.addAll(workerContainers.values());
+        containers.addAll(brokerContainers.values());
+        containers.addAll(bookieContainers.values());
+
+        if (externalServices != null) {
+            containers.addAll(externalServices.values());
         }
 
-        containers.parallel().forEach(GenericContainer::stop);
+        containers.add(proxyContainer);
+        containers.add(csContainer);
+        containers.add(zkContainer);
+        containers.add(prestoWorkerContainer);
+
+        containers.parallelStream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+
+        containers.parallelStream().forEach(GenericContainer::stop);
 
         try {
             network.close();
@@ -349,11 +385,19 @@ public class PulsarCluster {
     }
 
     public BrokerContainer getAnyBroker() {
-        return getAnyContainer(brokerContainers, "broker");
+        return getAnyContainer(brokerContainers, "pulsar-broker");
     }
 
     public synchronized WorkerContainer getAnyWorker() {
-        return getAnyContainer(workerContainers, "functions-worker");
+        return getAnyContainer(workerContainers, "pulsar-functions-worker");
+    }
+
+    public BrokerContainer getBroker(int index) {
+        return getAnyContainer(brokerContainers, "pulsar-broker", index);
+    }
+
+    public synchronized WorkerContainer getWorker(int index) {
+        return getAnyContainer(workerContainers, "pulsar-functions-worker", 
index);
     }
 
     private <T> T getAnyContainer(Map<String, T> containers, String 
serviceName) {
@@ -364,6 +408,12 @@ public class PulsarCluster {
         return containerList.get(0);
     }
 
+    private <T> T getAnyContainer(Map<String, T> containers, String 
serviceName, int index) {
+        checkArgument(!containers.isEmpty(), "No " + serviceName + " is 
alive");
+        checkArgument((index >= 0 && index < containers.size()), "Index : " + 
index + " is out range");
+        return containers.get(serviceName.toLowerCase() + "-" + index);
+    }
+
     public Collection<BrokerContainer> getBrokers() {
         return brokerContainers.values();
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index f49200e..3aeb6c6 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -78,6 +78,15 @@ public class PulsarClusterSpec {
     @Default
     int numFunctionWorkers = 0;
 
+
+    /**
+     * Enable a Preto Worker Node
+     *
+     * @return the flag whether presto worker is eanbled
+     */
+    @Default
+    boolean enablePrestoWorker = false;
+
     /**
      * Returns the function runtime type.
      *

Reply via email to