This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f7022df Add Presto Sql Test (#2504) f7022df is described below commit f7022df6cc5c443cb56dfe133e81a77ff9b7ad72 Author: Ali Ahmed <alahmed...@gmail.com> AuthorDate: Tue Oct 2 13:49:29 2018 -0700 Add Presto Sql Test (#2504) * Add Presto SQL Test Add Pulsar Presto Container * Add prest query test Fix netty dependency --- pom.xml | 11 ++ .../docker-images/latest-version-image/Dockerfile | 4 +- .../{Dockerfile => conf/presto_worker.conf} | 23 ++-- .../{Dockerfile => scripts/run-presto-worker.sh} | 22 ++-- tests/integration/pom.xml | 9 +- .../containers/PrestoWorkerContainer.java | 39 +++++++ .../pulsar/tests/integration/presto/Stock.java | 82 +++++++++++++ .../tests/integration/presto/TestBasicPresto.java | 127 +++++++++++++++++++++ .../integration/topologies/PulsarCluster.java | 76 +++++++++--- .../integration/topologies/PulsarClusterSpec.java | 9 ++ 10 files changed, 356 insertions(+), 46 deletions(-) diff --git a/pom.xml b/pom.xml index ddc7dee..27a44c1 100644 --- a/pom.xml +++ b/pom.xml @@ -843,6 +843,11 @@ flexible messaging model and an intuitive client API.</description> <artifactId>cassandra-driver-core</artifactId> <version>${cassandra.version}</version> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>3.11.1</version> + </dependency> </dependencies> </dependencyManagement> @@ -880,6 +885,12 @@ flexible messaging model and an intuitive client API.</description> </dependency> <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.0</version> diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile index 862b53c..491a913 100644 --- a/tests/docker-images/latest-version-image/Dockerfile +++ b/tests/docker-images/latest-version-image/Dockerfile @@ -25,11 +25,11 @@ RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p /pulsa COPY conf/supervisord.conf /etc/supervisord.conf COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf \ - conf/proxy.conf /etc/supervisord/conf.d/ + conf/proxy.conf conf/presto_worker.conf /etc/supervisord/conf.d/ COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \ ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/ COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \ - scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh \ + scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh scripts/run-presto-worker.sh \ /pulsar/bin/ diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/conf/presto_worker.conf similarity index 52% copy from tests/docker-images/latest-version-image/Dockerfile copy to tests/docker-images/latest-version-image/conf/presto_worker.conf index 862b53c..74172ae 100644 --- a/tests/docker-images/latest-version-image/Dockerfile +++ b/tests/docker-images/latest-version-image/conf/presto_worker.conf @@ -17,19 +17,10 @@ # under the License. # -FROM apachepulsar/pulsar-all:latest - -RUN apt-get update && apt-get install -y supervisor - -RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p /pulsar/ssl - -COPY conf/supervisord.conf /etc/supervisord.conf -COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf \ - conf/proxy.conf /etc/supervisord/conf.d/ - -COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \ - ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/ - -COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \ - scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh \ - /pulsar/bin/ +[program:presto-worker] +autostart=false +redirect_stderr=true +stdout_logfile=/var/log/pulsar/presto_worker.log +directory=/pulsar +environment=PULSAR_MEM=-Xms128M +command=/pulsar/bin/pulsar sql-worker start \ No newline at end of file diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh old mode 100644 new mode 100755 similarity index 52% copy from tests/docker-images/latest-version-image/Dockerfile copy to tests/docker-images/latest-version-image/scripts/run-presto-worker.sh index 862b53c..a78f955 --- a/tests/docker-images/latest-version-image/Dockerfile +++ b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh @@ -1,3 +1,4 @@ +#!/usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -17,19 +18,12 @@ # under the License. # -FROM apachepulsar/pulsar-all:latest +bin/apply-config-from-env.py conf/presto/catalog/pulsar.properties && \ + bin/apply-config-from-env.py conf/pulsar_env.sh -RUN apt-get update && apt-get install -y supervisor +if [ -z "$NO_AUTOSTART" ]; then + sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/presto_worker.conf +fi -RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p /pulsar/ssl - -COPY conf/supervisord.conf /etc/supervisord.conf -COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf \ - conf/proxy.conf /etc/supervisord/conf.d/ - -COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \ - ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/ - -COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \ - scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh \ - /pulsar/bin/ +bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w +exec /usr/bin/supervisord -c /etc/supervisord.conf diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index f4712c8..fc5ae42 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -77,6 +77,12 @@ <dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-core</artifactId> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + </exclusion> + </exclusions> <scope>test</scope> </dependency> <dependency> @@ -116,6 +122,7 @@ <artifactId>jackson-databind</artifactId> <scope>test</scope> </dependency> + <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-yaml</artifactId> @@ -127,7 +134,7 @@ <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.3.2</version> </dependency> - + </dependencies> <build> diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java new file mode 100644 index 0000000..edb0a0c --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.containers; + +/** + * A pulsar container that runs the presto worker + */ +public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer> { + + public static final String NAME = "presto-worker"; + public static final int PRESTO_HTTP_PORT = 8081; + + public PrestoWorkerContainer(String clusterName, String hostname) { + super( + clusterName, + hostname, + hostname, + "bin/run-presto-worker.sh", + -1, + PRESTO_HTTP_PORT, + "/v1/node"); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/Stock.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/Stock.java new file mode 100644 index 0000000..f5d85f1 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/Stock.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.presto; + +import java.util.Objects; + +public class Stock { + + private int entryId; + private String symbol; + private double sharePrice; + + public Stock(int entryId, String symbol, double sharePrice) { + this.entryId = entryId; + this.symbol = symbol; + this.sharePrice = sharePrice; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Stock stock = (Stock) o; + return entryId == stock.entryId && + Double.compare(stock.sharePrice, sharePrice) == 0 && + Objects.equals(symbol, stock.symbol); + } + + @Override + public int hashCode() { + return Objects.hash(symbol, sharePrice); + } + + @Override + public String toString() { + return "Stock{" + + "entryId=" + entryId + + ", symbol='" + symbol + '\'' + + ", sharePrice=" + sharePrice + + '}'; + } + + public int getEntryId() { + return entryId; + } + + public void setEntryId(int entryId) { + this.entryId = entryId; + } + + public String getSymbol() { + return symbol; + } + + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + public double getSharePrice() { + return sharePrice; + } + + public void setSharePrice(double sharePrice) { + this.sharePrice = sharePrice; + } +} \ No newline at end of file diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java new file mode 100644 index 0000000..0007e3d --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.presto; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import java.util.stream.Stream; + +import static java.util.stream.Collectors.joining; +import static org.assertj.core.api.Assertions.assertThat; + +@Slf4j +public class TestBasicPresto extends PulsarClusterTestBase { + + private static final int NUM_OF_STOCKS = 10; + + @BeforeSuite + @Override + public void setupCluster() throws Exception { + final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5)) + .filter(s -> s != null && !s.isEmpty()) + .collect(joining("-")); + + PulsarClusterSpec spec = PulsarClusterSpec.builder() + .numBookies(2) + .numBrokers(1) + .enablePrestoWorker(true) + .clusterName(clusterName) + .build(); + + log.info("Setting up cluster {} with {} bookies, {} brokers", + spec.clusterName(), spec.numBookies(), spec.numBrokers()); + + pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + log.info("Cluster {} is setup with presto worker", spec.clusterName()); + } + + @Test + public void testDefaultCatalog() throws Exception { + ContainerExecResult containerExecResult = execQuery("show catalogs;"); + assertThat(containerExecResult.getExitCode()).isEqualTo(0); + assertThat(containerExecResult.getStdout()).contains("pulsar", "system"); + } + + @Test + public void testSimpleSQLQuery() throws Exception { + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + + final String stocksTopic = "stocks"; + + @Cleanup + Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class)) + .topic(stocksTopic) + .create(); + + + for (int i = 0 ; i < NUM_OF_STOCKS; ++i) { + final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10); + producer.send(stock); + } + + ContainerExecResult containerExecResult = execQuery("select * from pulsar.\"public/default\".stocks order by entryid;"); + assertThat(containerExecResult.getExitCode()).isEqualTo(0); + log.info("select sql query output \n{}", containerExecResult.getStdout()); + String[] split = containerExecResult.getStdout().split("\n"); + assertThat(split.length).isGreaterThan(NUM_OF_STOCKS - 2); + + String[] split2 = containerExecResult.getStdout().split("\n|,"); + + for (int i = 0; i < NUM_OF_STOCKS - 2; ++i) { + assertThat(split2).contains("\"" + i + "\""); + assertThat(split2).contains("\"" + "STOCK_" + i + "\""); + assertThat(split2).contains("\"" + (100.0 + i * 10) + "\""); + } + + } + + @AfterSuite + @Override + public void tearDownCluster() { + super.tearDownCluster(); + } + + public static ContainerExecResult execQuery(final String query) throws Exception { + ContainerExecResult containerExecResult; + + containerExecResult = pulsarCluster.getPrestoWorkerContainer() + .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'"); + + return containerExecResult; + + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index af60712..8ee2f57 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -25,19 +25,21 @@ import static org.apache.pulsar.tests.integration.containers.PulsarContainer.ZK_ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Function; -import java.util.stream.Stream; +import java.util.stream.Collectors; -import com.google.common.collect.Streams; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.tests.integration.containers.BKContainer; import org.apache.pulsar.tests.integration.containers.BrokerContainer; import org.apache.pulsar.tests.integration.containers.CSContainer; +import org.apache.pulsar.tests.integration.containers.PrestoWorkerContainer; import org.apache.pulsar.tests.integration.containers.ProxyContainer; import org.apache.pulsar.tests.integration.containers.PulsarContainer; import org.apache.pulsar.tests.integration.containers.WorkerContainer; @@ -78,13 +80,29 @@ public class PulsarCluster { private final Map<String, BrokerContainer> brokerContainers; private final Map<String, WorkerContainer> workerContainers; private final ProxyContainer proxyContainer; + private final PrestoWorkerContainer prestoWorkerContainer; private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap(); + private final boolean enablePrestoWorker; private PulsarCluster(PulsarClusterSpec spec) { this.spec = spec; this.clusterName = spec.clusterName(); this.network = Network.newNetwork(); + this.enablePrestoWorker = spec.enablePrestoWorker(); + + if (enablePrestoWorker) { + prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME) + .withNetwork(network) + .withNetworkAliases(PrestoWorkerContainer.NAME) + .withEnv("clusterName", clusterName) + .withEnv("zkServers", ZKContainer.NAME) + .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) + .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080"); + } else { + prestoWorkerContainer = null; + } + this.zkContainer = new ZKContainer(clusterName); this.zkContainer @@ -196,6 +214,11 @@ public class PulsarCluster { log.info("\tBinary Service Url : {}", getPlainTextServiceUrl()); log.info("\tHttp Service Url : {}", getHttpServiceUrl()); + if (enablePrestoWorker) { + log.info("Starting Presto Worker"); + prestoWorkerContainer.start(); + } + // start external services this.externalServices = spec.externalServices; if (null != externalServices) { @@ -238,19 +261,32 @@ public class PulsarCluster { return containers; } + public PrestoWorkerContainer getPrestoWorkerContainer() { + return prestoWorkerContainer; + } + public synchronized void stop() { - Stream<GenericContainer> containers = Streams.concat( - workerContainers.values().stream(), - brokerContainers.values().stream(), - bookieContainers.values().stream(), - Stream.of(proxyContainer, csContainer, zkContainer) - ); - if (spec.externalServices() != null) { - containers = Streams.concat(containers, spec.externalServices().values().stream()); + List<GenericContainer> containers = new ArrayList<>(); + + containers.addAll(workerContainers.values()); + containers.addAll(brokerContainers.values()); + containers.addAll(bookieContainers.values()); + + if (externalServices != null) { + containers.addAll(externalServices.values()); } - containers.parallel().forEach(GenericContainer::stop); + containers.add(proxyContainer); + containers.add(csContainer); + containers.add(zkContainer); + containers.add(prestoWorkerContainer); + + containers.parallelStream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + containers.parallelStream().forEach(GenericContainer::stop); try { network.close(); @@ -349,11 +385,19 @@ public class PulsarCluster { } public BrokerContainer getAnyBroker() { - return getAnyContainer(brokerContainers, "broker"); + return getAnyContainer(brokerContainers, "pulsar-broker"); } public synchronized WorkerContainer getAnyWorker() { - return getAnyContainer(workerContainers, "functions-worker"); + return getAnyContainer(workerContainers, "pulsar-functions-worker"); + } + + public BrokerContainer getBroker(int index) { + return getAnyContainer(brokerContainers, "pulsar-broker", index); + } + + public synchronized WorkerContainer getWorker(int index) { + return getAnyContainer(workerContainers, "pulsar-functions-worker", index); } private <T> T getAnyContainer(Map<String, T> containers, String serviceName) { @@ -364,6 +408,12 @@ public class PulsarCluster { return containerList.get(0); } + private <T> T getAnyContainer(Map<String, T> containers, String serviceName, int index) { + checkArgument(!containers.isEmpty(), "No " + serviceName + " is alive"); + checkArgument((index >= 0 && index < containers.size()), "Index : " + index + " is out range"); + return containers.get(serviceName.toLowerCase() + "-" + index); + } + public Collection<BrokerContainer> getBrokers() { return brokerContainers.values(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java index f49200e..3aeb6c6 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java @@ -78,6 +78,15 @@ public class PulsarClusterSpec { @Default int numFunctionWorkers = 0; + + /** + * Enable a Preto Worker Node + * + * @return the flag whether presto worker is eanbled + */ + @Default + boolean enablePrestoWorker = false; + /** * Returns the function runtime type. *