This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch refactor_kafka in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 3226348eab493d829215a8091e40a8caf7c0944d Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Wed Jul 17 17:28:20 2019 -0700 Refactor pinot-connectors to break the dependencies from kafka 0.9 --- pinot-common/pom.xml | 4 - pinot-connectors/pinot-connector-kafka-0.9/pom.xml | 7 +- .../impl/kafka/server/KafkaDataProducer.java | 57 +++++++++ .../KafkaDataServerStartable.java} | 136 +++++++++++++-------- pinot-core/pom.xml | 4 - .../core/realtime/stream/StreamDataProducer.java | 35 ++++++ .../core/realtime/stream/StreamDataProvider.java | 45 +++++++ .../realtime/stream/StreamDataServerStartable.java | 33 +++++ .../function/FunctionExpressionEvaluatorTest.java | 1 - pinot-integration-tests/pom.xml | 2 +- .../tests/BaseClusterIntegrationTest.java | 25 ++-- .../pinot/integration/tests/CommonKafkaUtils.java | 102 ++++++++++++++++ .../ControllerPeriodicTasksIntegrationTests.java | 3 +- .../tests/HybridClusterIntegrationTest.java | 3 +- ...ridClusterIntegrationTestCommandLineRunner.java | 13 +- .../tests/RealtimeClusterIntegrationTest.java | 3 +- pinot-perf/pom.xml | 2 +- .../perf/BenchmarkRealtimeConsumptionSpeed.java | 13 +- .../org/apache/pinot/perf/RealtimeStressTest.java | 16 +-- pinot-tools/pom.xml | 3 +- .../org/apache/pinot/tools/HybridQuickstart.java | 40 +++--- .../org/apache/pinot/tools/RealtimeQuickStart.java | 35 ++++-- .../tools/admin/command/StartKafkaCommand.java | 20 ++- .../admin/command/StreamAvroIntoKafkaCommand.java | 25 ++-- .../pinot/tools/streams/AirlineDataStream.java | 21 ++-- .../pinot/tools/streams/MeetupRsvpStream.java | 29 +++-- pom.xml | 9 +- 27 files changed, 503 insertions(+), 183 deletions(-) diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml index 2e27ac8..41c9b39 100644 --- a/pinot-common/pom.xml +++ b/pinot-common/pom.xml @@ -198,10 +198,6 @@ <artifactId>jopt-simple</artifactId> </dependency> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </dependency> - <dependency> <groupId>nl.jqno.equalsverifier</groupId> <artifactId>equalsverifier</artifactId> <scope>test</scope> diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml index e8f6c93..0450102 100644 --- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml +++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml @@ -42,7 +42,7 @@ <!-- Kafka --> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${kafka.scala.version}</artifactId> + <artifactId>kafka_2.10</artifactId> <version>${kafka.lib.version}</version> <exclusions> <exclusion> @@ -63,5 +63,10 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>2.10.5</version> + </dependency> </dependencies> </project> diff --git a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java new file mode 100644 index 0000000..0eb4ac6 --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka.server; + +import java.util.Properties; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.apache.pinot.core.realtime.stream.StreamDataProducer; + + +public class KafkaDataProducer implements StreamDataProducer { + Producer<byte[], byte[]> producer; + + @Override + public void init(Properties props) { + ProducerConfig producerConfig = new ProducerConfig(props); + this.producer = new Producer(producerConfig); + } + + @Override + public void produce(String topic, byte[] payload) { + KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, payload); + this.produce(data); + } + + @Override + public void produce(String topic, byte[] key, byte[] payload) { + KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, key, payload); + this.produce(data); + } + + public void produce(KeyedMessage message) { + producer.send(message); + } + + @Override + public void close() { + producer.close(); + } +} diff --git a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java similarity index 63% rename from pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java rename to pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java index 5f1de99..dcd44c0 100644 --- a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java +++ b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.realtime.impl.kafka; +package org.apache.pinot.core.realtime.impl.kafka.server; import java.io.File; import java.security.Permission; @@ -29,17 +29,25 @@ import kafka.server.KafkaServerStartable; import org.I0Itec.zkclient.ZkClient; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.ZkStarter; +import org.apache.pinot.core.realtime.stream.StreamDataServerStartable; -/** - * Utilities to start Kafka during unit tests. - * - */ -public class KafkaStarterUtils { +public class KafkaDataServerStartable implements StreamDataServerStartable { public static final int DEFAULT_KAFKA_PORT = 19092; public static final int DEFAULT_BROKER_ID = 0; public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR + "/kafka"; public static final String DEFAULT_KAFKA_BROKER = "localhost:" + DEFAULT_KAFKA_PORT; + private static final String PORT = "port"; + private static final String BROKER_ID = "brokerId"; + private static final String ZK_STR = "zkStr"; + private static final String LOG_DIR_PATH = "logDirPath"; + private static final int DEFAULT_TOPIC_PARTITION = 1; + + private KafkaServerStartable serverStartable; + private String zkStr; + private int port; + private int brokerId; + private String logDirPath; public static Properties getDefaultKafkaConfiguration() { final Properties configuration = new Properties(); @@ -50,51 +58,34 @@ public class KafkaStarterUtils { // Set host name configureHostName(configuration, "localhost"); + configuration.put(PORT, DEFAULT_KAFKA_PORT); + configuration.put(BROKER_ID, DEFAULT_BROKER_ID); + configuration.put(ZK_STR, DEFAULT_ZK_STR); + configuration.put(LOG_DIR_PATH, "/tmp/kafka-" + Double.toHexString(Math.random())); + return configuration; } - public static List<KafkaServerStartable> startServers(final int brokerCount, final int port, final String zkStr, + public static List<StreamDataServerStartable> startServers(final int brokerCount, final int port, final String zkStr, final Properties configuration) { - List<KafkaServerStartable> startables = new ArrayList<>(brokerCount); + List<StreamDataServerStartable> startables = new ArrayList<>(brokerCount); for (int i = 0; i < brokerCount; i++) { - startables.add(startServer(port + i, i, zkStr, "/tmp/kafka-" + Double.toHexString(Math.random()), configuration)); + startables.add(startServer(port + i, i, zkStr, configuration)); } - return startables; } - public static KafkaServerStartable startServer(final int port, final int brokerId, final String zkStr, + public static StreamDataServerStartable startServer(final int port, final int brokerId, final String zkStr, final Properties configuration) { - return startServer(port, brokerId, zkStr, "/tmp/kafka-" + Double.toHexString(Math.random()), configuration); - } - - public static KafkaServerStartable startServer(final int port, final int brokerId, final String zkStr, - final String logDirPath, final Properties configuration) { - // Create the ZK nodes for Kafka, if needed - int indexOfFirstSlash = zkStr.indexOf('/'); - if (indexOfFirstSlash != -1) { - String bareZkUrl = zkStr.substring(0, indexOfFirstSlash); - String zkNodePath = zkStr.substring(indexOfFirstSlash); - ZkClient client = new ZkClient(bareZkUrl); - client.createPersistent(zkNodePath, true); - client.close(); - } - - File logDir = new File(logDirPath); - logDir.mkdirs(); - - configureKafkaPort(configuration, port); - configureZkConnectionString(configuration, zkStr); - configureBrokerId(configuration, brokerId); - configureKafkaLogDirectory(configuration, logDir); - configuration.put("zookeeper.session.timeout.ms", "60000"); - KafkaConfig config = new KafkaConfig(configuration); - - KafkaServerStartable serverStartable = new KafkaServerStartable(config); - serverStartable.startup(); - - return serverStartable; + final KafkaDataServerStartable kafkaDataServerStartable = new KafkaDataServerStartable(); + configuration.put(PORT, port); + configuration.put(BROKER_ID, brokerId); + configuration.put(ZK_STR, zkStr); + configuration.put(LOG_DIR_PATH, "/tmp/kafka-" + Double.toHexString(Math.random())); + kafkaDataServerStartable.init(configuration); + kafkaDataServerStartable.start(); + return kafkaDataServerStartable; } public static void configureSegmentSizeBytes(Properties properties, int segmentSize) { @@ -129,17 +120,6 @@ public class KafkaStarterUtils { configuration.put("host.name", hostName); } - public static void stopServer(KafkaServerStartable serverStartable) { - serverStartable.shutdown(); - FileUtils.deleteQuietly(new File(serverStartable.serverConfig().logDirs().apply(0))); - } - - public static void createTopic(String kafkaTopic, String zkStr, int partitionCount) { - invokeTopicCommand( - new String[]{"--create", "--zookeeper", zkStr, "--replication-factor", "1", "--partitions", Integer.toString( - partitionCount), "--topic", kafkaTopic}); - } - private static void invokeTopicCommand(String[] args) { // jfim: Use Java security to trap System.exit in Kafka 0.9's TopicCommand System.setSecurityManager(new SecurityManager() { @@ -168,4 +148,58 @@ public class KafkaStarterUtils { public static void deleteTopic(String kafkaTopic, String zkStr) { invokeTopicCommand(new String[]{"--delete", "--zookeeper", zkStr, "--topic", kafkaTopic}); } + + public void init(Properties props) { + if (props == null) { + props = getDefaultKafkaConfiguration(); + } + port = Integer.parseInt(props.getProperty(PORT)); + brokerId = Integer.parseInt(props.getProperty(BROKER_ID)); + zkStr = props.getProperty(ZK_STR); + logDirPath = props.getProperty(LOG_DIR_PATH); + + // Create the ZK nodes for Kafka, if needed + int indexOfFirstSlash = zkStr.indexOf('/'); + if (indexOfFirstSlash != -1) { + String bareZkUrl = zkStr.substring(0, indexOfFirstSlash); + String zkNodePath = zkStr.substring(indexOfFirstSlash); + ZkClient client = new ZkClient(bareZkUrl); + client.createPersistent(zkNodePath, true); + client.close(); + } + + File logDir = new File(logDirPath); + logDir.mkdirs(); + + configureKafkaPort(props, port); + configureZkConnectionString(props, zkStr); + configureBrokerId(props, brokerId); + configureKafkaLogDirectory(props, logDir); + props.put("zookeeper.session.timeout.ms", "60000"); + KafkaConfig config = new KafkaConfig(props); + + serverStartable = new KafkaServerStartable(config); + } + + @Override + public void start() { + serverStartable.startup(); + } + + @Override + public void stop() { + serverStartable.shutdown(); + FileUtils.deleteQuietly(new File(serverStartable.serverConfig().logDirs().apply(0))); + } + + @Override + public void createTopic(String topic, Properties props) { + int partitionCount = DEFAULT_TOPIC_PARTITION; + if (props.containsKey("partition")) { + partitionCount = Integer.parseInt(props.getProperty("partition")); + } + invokeTopicCommand( + new String[]{"--create", "--zookeeper", this.zkStr, "--replication-factor", "1", "--partitions", Integer.toString( + partitionCount), "--topic", topic}); + } } diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml index 51c9231..d074639 100644 --- a/pinot-core/pom.xml +++ b/pinot-core/pom.xml @@ -161,10 +161,6 @@ </exclusions> </dependency> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </dependency> - <dependency> <groupId>net.sf.jopt-simple</groupId> <artifactId>jopt-simple</artifactId> </dependency> diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java new file mode 100644 index 0000000..53275a0 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.stream; + +import java.util.Properties; + + +/** + * StreamDataServerStartable is the interface for stream data sources. E.g. KafkaServerStartable, KinesisServerStarable. + */ +public interface StreamDataProducer { + void init(Properties props); + + void produce(String topic, byte[] payload); + + void produce(String topic, byte[] key, byte[] payload); + + void close(); +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java new file mode 100644 index 0000000..aec83a3 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.stream; + +import java.util.Properties; + + +public class StreamDataProvider { + public static StreamDataServerStartable getServerDataStartable(String clazz, Properties props) + throws Exception { + final StreamDataServerStartable streamDataServerStartable = + (StreamDataServerStartable) Class.forName(clazz).newInstance(); + streamDataServerStartable.init(props); + return streamDataServerStartable; + } + + public static StreamDataProducer getStreamDataProducer(String clazz, Properties props) + throws Exception { + + final StreamDataProducer streamDataProducer = (StreamDataProducer) Class.forName(clazz).newInstance(); + streamDataProducer.init(props); + return streamDataProducer; + } + + public static StreamDataServerStartable getServerDataStartable(String clazz) + throws Exception { + return getServerDataStartable(clazz, null); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java new file mode 100644 index 0000000..f53e201 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.stream; + +import java.util.Properties; + + +/** + * StreamDataServerStartable is the interface for stream data sources. E.g. KafkaDataServerStartable, KinesisDataServerStarable. + */ +public interface StreamDataServerStartable { + void init(Properties props); + void start(); + void stop(); + + void createTopic(String topic, Properties props); +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java index b6b41ff..dca78da 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java @@ -26,7 +26,6 @@ import org.joda.time.MutableDateTime; import org.joda.time.format.DateTimeFormat; import org.testng.Assert; import org.testng.annotations.Test; -import scala.collection.mutable.StringBuilder; public class FunctionExpressionEvaluatorTest { diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml index b58647f..b54a7d8 100644 --- a/pinot-integration-tests/pom.xml +++ b/pinot-integration-tests/pom.xml @@ -191,7 +191,7 @@ </dependency> <dependency> <groupId>org.apache.pinot</groupId> - <artifactId>pinot-connector-kafka-${kafka.version}</artifactId> + <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 67fabc6..faffe9b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import kafka.server.KafkaServerStartable; import org.apache.commons.io.FileUtils; import org.apache.pinot.client.ConnectionFactory; import org.apache.pinot.common.config.ColumnPartitionConfig; @@ -40,7 +39,7 @@ import org.apache.pinot.common.config.TagNameUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; +import org.apache.pinot.core.realtime.stream.StreamDataServerStartable; import org.apache.pinot.util.TestUtils; import org.testng.Assert; @@ -73,12 +72,13 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { protected final File _avroDir = new File(_tempDir, "avroDir"); protected final File _segmentDir = new File(_tempDir, "segmentDir"); protected final File _tarDir = new File(_tempDir, "tarDir"); - protected List<KafkaServerStartable> _kafkaStarters; + protected List<StreamDataServerStartable> _kafkaStarters; private org.apache.pinot.client.Connection _pinotConnection; private Connection _h2Connection; private QueryGenerator _queryGenerator; + /** * The following getters can be overridden to change default settings. */ @@ -318,8 +318,9 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { @Override public void run() { try { - ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic, - getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn()); + ClusterIntegrationTestUtils + .pushAvroIntoKafka(avroFiles, CommonKafkaUtils.DEFAULT_KAFKA_BROKER, kafkaTopic, + getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn()); } catch (Exception e) { // Ignored } @@ -328,15 +329,17 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { } protected void startKafka() { - _kafkaStarters = KafkaStarterUtils - .startServers(getNumKafkaBrokers(), KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR, - KafkaStarterUtils.getDefaultKafkaConfiguration()); - KafkaStarterUtils.createTopic(getKafkaTopic(), KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions()); + + _kafkaStarters = + CommonKafkaUtils.startServers(getNumKafkaBrokers(), CommonKafkaUtils.DEFAULT_KAFKA_PORT, CommonKafkaUtils.DEFAULT_ZK_STR, + CommonKafkaUtils.getDefaultKafkaConfiguration()); + _kafkaStarters.get(0) + .createTopic(getKafkaTopic(), CommonKafkaUtils.getTopicCreationProps(getNumKafkaPartitions())); } protected void stopKafka() { - for (KafkaServerStartable kafkaStarter : _kafkaStarters) { - KafkaStarterUtils.stopServer(kafkaStarter); + for (StreamDataServerStartable kafkaStarter : _kafkaStarters) { + kafkaStarter.stop(); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommonKafkaUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommonKafkaUtils.java new file mode 100644 index 0000000..7c6ed83 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommonKafkaUtils.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.apache.pinot.common.utils.ZkStarter; +import org.apache.pinot.core.realtime.stream.StreamDataProvider; +import org.apache.pinot.core.realtime.stream.StreamDataServerStartable; + + +public class CommonKafkaUtils { + public static final int DEFAULT_BROKER_ID = 0; + public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR + "/kafka"; + public static final String PORT = "port"; + public static final String BROKER_ID = "brokerId"; + public static final String ZK_STR = "zkStr"; + public static final String LOG_DIR_PATH = "logDirPath"; + public static int DEFAULT_KAFKA_PORT = 19092; + public static final String DEFAULT_KAFKA_BROKER = "localhost:" + DEFAULT_KAFKA_PORT; + public static final String KAFKA_09_SERVER_STARTABLE_CLASS_NAME = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable"; + + public static Properties getDefaultKafkaConfiguration() { + final Properties configuration = new Properties(); + + // Enable topic deletion by default for integration tests + configureTopicDeletion(configuration, true); + + // Set host name + configureHostName(configuration, "localhost"); + + configuration.put(PORT, DEFAULT_KAFKA_PORT); + configuration.put(BROKER_ID, DEFAULT_BROKER_ID); + configuration.put(ZK_STR, DEFAULT_ZK_STR); + configuration.put(LOG_DIR_PATH, "/tmp/kafka-" + Double.toHexString(Math.random())); + + return configuration; + } + + public static void configureTopicDeletion(Properties configuration, boolean topicDeletionEnabled) { + configuration.put("delete.topic.enable", Boolean.toString(topicDeletionEnabled)); + } + + public static void configureHostName(Properties configuration, String hostName) { + configuration.put("host.name", hostName); + } + + public static Properties getTopicCreationProps(int numKafkaPartitions) { + Properties topicProps = new Properties(); + topicProps.put("partition", numKafkaPartitions); + return topicProps; + } + + + public static List<StreamDataServerStartable> startServers(final int brokerCount, final int port, final String zkStr, + final Properties configuration) { + List<StreamDataServerStartable> startables = new ArrayList<>(brokerCount); + + for (int i = 0; i < brokerCount; i++) { + startables.add(startServer(port + i, i, zkStr, configuration)); + } + return startables; + } + + public static StreamDataServerStartable startServer(final int port, final int brokerId, final String zkStr, + final Properties configuration) { + StreamDataServerStartable kafkaStarter; + + String kafkaClazz = KAFKA_09_SERVER_STARTABLE_CLASS_NAME; + try { + kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz); + } catch (Exception e) { + throw new RuntimeException("Failed to start " + kafkaClazz, e); + } + + configuration.put(CommonKafkaUtils.PORT, port); + configuration.put(CommonKafkaUtils.BROKER_ID, brokerId); + configuration.put(CommonKafkaUtils.ZK_STR, zkStr); + configuration.put(CommonKafkaUtils.LOG_DIR_PATH, "/tmp/kafka-" + Double.toHexString(Math.random())); + + kafkaStarter.init(configuration); + kafkaStarter.start(); + return kafkaStarter; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java index fc14789..64fe60f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java @@ -51,7 +51,6 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker; import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.ITestContext; @@ -212,7 +211,7 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat Assert.assertNotNull(outgoingTimeUnit); String timeType = outgoingTimeUnit.toString(); - addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR, topic, + addRealtimeTable(table, useLlc(), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, CommonKafkaUtils.DEFAULT_ZK_STR, topic, getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java index 0983d4e..004cdf2 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java @@ -33,7 +33,6 @@ import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.JsonUtils; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -132,7 +131,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet Assert.assertNotNull(outgoingTimeUnit); String timeType = outgoingTimeUnit.toString(); - addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR, + addHybridTable(getTableName(), useLlc(), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, CommonKafkaUtils.DEFAULT_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(), getSegmentPartitionConfig()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java index be24be8..970265c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java @@ -34,13 +34,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; -import kafka.server.KafkaServerStartable; import org.apache.commons.io.FileUtils; import org.apache.pinot.broker.requesthandler.PinotQueryRequest; import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.utils.JsonUtils; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; +import org.apache.pinot.core.realtime.stream.StreamDataServerStartable; import org.apache.pinot.tools.query.comparison.QueryComparison; import org.apache.pinot.util.TestUtils; import org.testng.Assert; @@ -188,7 +187,7 @@ public class HybridClusterIntegrationTestCommandLineRunner { private List<File> _realtimeAvroFiles; private File _queryFile; private File _responseFile; - private KafkaServerStartable _kafkaStarter; + private StreamDataServerStartable _kafkaStarter; private long _countStarResult; public CustomHybridClusterIntegrationTest() { @@ -258,11 +257,11 @@ public class HybridClusterIntegrationTestCommandLineRunner { // Start Zk and Kafka startZk(ZK_PORT); - _kafkaStarter = KafkaStarterUtils.startServer(KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID, KAFKA_ZK_STR, - KafkaStarterUtils.getDefaultKafkaConfiguration()); + _kafkaStarter = CommonKafkaUtils.startServer(KAFKA_PORT, CommonKafkaUtils.DEFAULT_BROKER_ID, KAFKA_ZK_STR, + CommonKafkaUtils.getDefaultKafkaConfiguration()); // Create Kafka topic - KafkaStarterUtils.createTopic(getKafkaTopic(), KAFKA_ZK_STR, getNumKafkaPartitions()); + _kafkaStarter.createTopic(getKafkaTopic(), CommonKafkaUtils.getTopicCreationProps(getNumKafkaPartitions())); // Start the Pinot cluster ControllerConf config = getDefaultControllerConfiguration(); @@ -379,7 +378,7 @@ public class HybridClusterIntegrationTestCommandLineRunner { stopServer(); stopBroker(); stopController(); - KafkaStarterUtils.stopServer(_kafkaStarter); + _kafkaStarter.stop(); stopZk(); FileUtils.deleteDirectory(_tempDir); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java index 8b4baeb..cf833ce 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java @@ -26,7 +26,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.data.Schema; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -90,7 +89,7 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe Assert.assertNotNull(outgoingTimeUnit); String timeType = outgoingTimeUnit.toString(); - addRealtimeTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR, + addRealtimeTable(getTableName(), useLlc(), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, CommonKafkaUtils.DEFAULT_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(), diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml index 74bb09f..2a0f5ed 100644 --- a/pinot-perf/pom.xml +++ b/pinot-perf/pom.xml @@ -52,7 +52,7 @@ </dependency> <dependency> <groupId>org.apache.pinot</groupId> - <artifactId>pinot-connector-kafka-${kafka.version}</artifactId> + <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java index 8bbb4d5..11535aa 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java @@ -27,8 +27,9 @@ import java.util.Random; import java.util.concurrent.TimeUnit; import kafka.server.KafkaServerStartable; import org.apache.pinot.common.utils.TarGzCompressionUtils; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; +import org.apache.pinot.core.realtime.stream.StreamDataServerStartable; import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; +import org.apache.pinot.integration.tests.CommonKafkaUtils; import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest; import org.apache.pinot.util.TestUtils; @@ -58,12 +59,12 @@ public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegratio throws Exception { // Start ZK and Kafka startZk(); - KafkaServerStartable kafkaStarter = KafkaStarterUtils - .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID, - KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration()); + StreamDataServerStartable kafkaStarter = CommonKafkaUtils + .startServer(CommonKafkaUtils.DEFAULT_KAFKA_PORT, CommonKafkaUtils.DEFAULT_BROKER_ID, + CommonKafkaUtils.DEFAULT_ZK_STR, CommonKafkaUtils.getDefaultKafkaConfiguration()); // Create Kafka topic - KafkaStarterUtils.createTopic(getKafkaTopic(), KafkaStarterUtils.DEFAULT_ZK_STR, 10); + kafkaStarter.createTopic(getKafkaTopic(), CommonKafkaUtils.getTopicCreationProps(10)); // Unpack data (needed to get the Avro schema) TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl( @@ -93,7 +94,7 @@ public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegratio public void run() { try { ClusterIntegrationTestUtils - .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), + .pushRandomAvroIntoKafka(avroFiles.get(0), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn()); } catch (Exception e) { // Ignored diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java index 71d28e7..7ffb16a 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java @@ -25,10 +25,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; -import kafka.server.KafkaServerStartable; import org.apache.pinot.common.utils.TarGzCompressionUtils; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; +import org.apache.pinot.core.realtime.stream.StreamDataServerStartable; import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; +import org.apache.pinot.integration.tests.CommonKafkaUtils; import org.apache.pinot.integration.tests.OfflineClusterIntegrationTest; import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest; import org.apache.pinot.util.TestUtils; @@ -59,12 +59,12 @@ public class RealtimeStressTest extends RealtimeClusterIntegrationTest { throws Exception { // Start ZK and Kafka startZk(); - KafkaServerStartable kafkaStarter = KafkaStarterUtils - .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID, - KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration()); + StreamDataServerStartable kafkaStarter = CommonKafkaUtils + .startServer(CommonKafkaUtils.DEFAULT_KAFKA_PORT, CommonKafkaUtils.DEFAULT_BROKER_ID, + CommonKafkaUtils.DEFAULT_ZK_STR, CommonKafkaUtils.getDefaultKafkaConfiguration()); // Create Kafka topic - KafkaStarterUtils.createTopic(getKafkaTopic(), KafkaStarterUtils.DEFAULT_ZK_STR, 10); + kafkaStarter.createTopic(getKafkaTopic(), CommonKafkaUtils.getTopicCreationProps(10)); // Unpack data (needed to get the Avro schema) TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl( @@ -93,7 +93,7 @@ public class RealtimeStressTest extends RealtimeClusterIntegrationTest { // Generate ROW_COUNT rows and write them into Kafka ClusterIntegrationTestUtils - .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT, + .pushRandomAvroIntoKafka(avroFiles.get(0), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn()); rowsWritten += ROW_COUNT; @@ -115,7 +115,7 @@ public class RealtimeStressTest extends RealtimeClusterIntegrationTest { // Write more rows if needed if (rowsWritten - pinotRecordCount < MIN_ROW_COUNT) { ClusterIntegrationTestUtils - .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), + .pushRandomAvroIntoKafka(avroFiles.get(0), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn()); rowsWritten += ROW_COUNT; } diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml index 6e18295..cf9f507 100644 --- a/pinot-tools/pom.xml +++ b/pinot-tools/pom.xml @@ -56,8 +56,9 @@ </dependency> <dependency> <groupId>org.apache.pinot</groupId> - <artifactId>pinot-connector-kafka-${kafka.version}</artifactId> + <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId> <version>${project.version}</version> + <scope>runtime</scope> </dependency> <dependency> <groupId>commons-cli</groupId> diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java index fe004d3..22f7476 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java @@ -23,12 +23,13 @@ import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; import java.net.URL; -import kafka.server.KafkaServerStartable; +import java.util.Properties; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.utils.ZkStarter; import org.apache.pinot.core.data.readers.FileFormat; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; +import org.apache.pinot.core.realtime.stream.StreamDataProvider; +import org.apache.pinot.core.realtime.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.command.QuickstartRunner; import org.apache.pinot.tools.streams.AirlineDataStream; @@ -37,16 +38,21 @@ import static org.apache.pinot.tools.Quickstart.printStatus; public class HybridQuickstart { - private HybridQuickstart() { - } - private File _offlineQuickStartDataDir; private File _realtimeQuickStartDataDir; - private KafkaServerStartable _kafkaStarter; + private StreamDataServerStartable _kafkaStarter; private ZkStarter.ZookeeperInstance _zookeeperInstance; private File _schemaFile; private File _dataFile; + private HybridQuickstart() { + } + + public static void main(String[] args) + throws Exception { + new HybridQuickstart().execute(); + } + private QuickstartTableRequest prepareOfflineTableRequest() throws IOException { _offlineQuickStartDataDir = new File("quickStartData" + System.currentTimeMillis()); @@ -94,11 +100,16 @@ public class HybridQuickstart { private void startKafka() { _zookeeperInstance = ZkStarter.startLocalZkServer(); - _kafkaStarter = KafkaStarterUtils - .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID, - KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration()); - - KafkaStarterUtils.createTopic("airlineStatsEvents", KafkaStarterUtils.DEFAULT_ZK_STR, 10); + String kafkaClazz = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable"; + try { + _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz); + } catch (Exception e) { + throw new RuntimeException("Failed to start " + kafkaClazz, e); + } + _kafkaStarter.start(); + Properties topicProps = new Properties(); + topicProps.put("partition", 10); + _kafkaStarter.createTopic("airlineStatsEvents", topicProps); } public void execute() @@ -153,7 +164,7 @@ public class HybridQuickstart { stream.shutdown(); Thread.sleep(2000); runner.stop(); - KafkaStarterUtils.stopServer(_kafkaStarter); + _kafkaStarter.stop(); ZkStarter.stopLocalZkServer(_zookeeperInstance); FileUtils.deleteDirectory(_offlineQuickStartDataDir); FileUtils.deleteDirectory(_realtimeQuickStartDataDir); @@ -163,9 +174,4 @@ public class HybridQuickstart { } }); } - - public static void main(String[] args) - throws Exception { - new HybridQuickstart().execute(); - } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java index 8df1fbb..44661fa 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java @@ -22,10 +22,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import java.io.File; import java.net.URL; -import kafka.server.KafkaServerStartable; +import java.util.Properties; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.ZkStarter; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; +import org.apache.pinot.core.realtime.stream.StreamDataProvider; +import org.apache.pinot.core.realtime.stream.StreamDataServerStartable; import org.apache.pinot.tools.Quickstart.Color; import org.apache.pinot.tools.admin.command.QuickstartRunner; import org.apache.pinot.tools.streams.MeetupRsvpStream; @@ -35,9 +36,16 @@ import static org.apache.pinot.tools.Quickstart.printStatus; public class RealtimeQuickStart { + private StreamDataServerStartable _kafkaStarter; + private RealtimeQuickStart() { } + public static void main(String[] args) + throws Exception { + new RealtimeQuickStart().execute(); + } + public void execute() throws Exception { final File quickStartDataDir = new File("quickStartData" + System.currentTimeMillis()); @@ -64,10 +72,18 @@ public class RealtimeQuickStart { printStatus(Color.CYAN, "***** Starting Kafka *****"); final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); - final KafkaServerStartable kafkaStarter = KafkaStarterUtils - .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID, - KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration()); - KafkaStarterUtils.createTopic("meetupRSVPEvents", KafkaStarterUtils.DEFAULT_ZK_STR, 10); + + String kafkaClazz = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable"; + try { + _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz); + } catch (Exception e) { + throw new RuntimeException("Failed to start " + kafkaClazz, e); + } + _kafkaStarter.start(); + Properties topicProps = new Properties(); + topicProps.put("partition", 10); + _kafkaStarter.createTopic("meetupRSVPEvents", topicProps); + printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****"); runner.startAll(); printStatus(Color.CYAN, "***** Adding meetupRSVP schema *****"); @@ -87,7 +103,7 @@ public class RealtimeQuickStart { printStatus(Color.GREEN, "***** Shutting down realtime quick start *****"); meetupRSVPProvider.stopPublishing(); runner.stop(); - KafkaStarterUtils.stopServer(kafkaStarter); + _kafkaStarter.stop(); ZkStarter.stopLocalZkServer(zookeeperInstance); FileUtils.deleteDirectory(quickStartDataDir); } catch (Exception e) { @@ -130,9 +146,4 @@ public class RealtimeQuickStart { printStatus(Color.GREEN, "You can always go to http://localhost:9000/query/ to play around in the query console"); } - - public static void main(String[] args) - throws Exception { - new RealtimeQuickStart().execute(); - } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java index 38f042c..e1e7d40 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java @@ -20,7 +20,8 @@ package org.apache.pinot.tools.admin.command; import java.io.File; import java.io.IOException; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; +import org.apache.pinot.core.realtime.stream.StreamDataProvider; +import org.apache.pinot.core.realtime.stream.StreamDataServerStartable; import org.apache.pinot.tools.Command; import org.kohsuke.args4j.Option; import org.slf4j.Logger; @@ -32,17 +33,22 @@ import org.slf4j.LoggerFactory; */ public class StartKafkaCommand extends AbstractBaseAdminCommand implements Command { private static final Logger LOGGER = LoggerFactory.getLogger(StartKafkaCommand.class); + private static final int DEFAULT_KAFKA_PORT = 19092; + private static final int DEFAULT_BROKER_ID = 0; + + @Option(name = "-port", required = false, metaVar = "<int>", usage = "Port to start Kafka server on.") - private int _port = KafkaStarterUtils.DEFAULT_KAFKA_PORT; + private int _port = DEFAULT_KAFKA_PORT; @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @Option(name = "-brokerId", required = false, metaVar = "<int>", usage = "Kafka broker ID.") - private int _brokerId = KafkaStarterUtils.DEFAULT_BROKER_ID; + private int _brokerId = DEFAULT_BROKER_ID; @Option(name = "-zkAddress", required = false, metaVar = "<string>", usage = "Address of Zookeeper.") private String _zkAddress = "localhost:2181"; + private StreamDataServerStartable _kafkaStarter; @Override public boolean getHelp() { @@ -67,7 +73,13 @@ public class StartKafkaCommand extends AbstractBaseAdminCommand implements Comma @Override public boolean execute() throws IOException { - KafkaStarterUtils.startServer(_port, _brokerId, _zkAddress, KafkaStarterUtils.getDefaultKafkaConfiguration()); + String kafkaClazz = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable"; + try { + _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz); + } catch (Exception e) { + throw new RuntimeException("Failed to start " + kafkaClazz, e); + } + _kafkaStarter.start(); LOGGER.info("Start kafka at localhost:" + _port + " in thread " + Thread.currentThread().getName()); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java index 0a75023..f0ff30e 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java @@ -24,14 +24,12 @@ import java.io.File; import java.io.IOException; import java.util.Properties; import java.util.concurrent.TimeUnit; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericRecord; import org.apache.pinot.common.utils.HashUtil; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; import org.apache.pinot.core.util.AvroUtils; +import org.apache.pinot.core.realtime.stream.StreamDataProducer; +import org.apache.pinot.core.realtime.stream.StreamDataProvider; import org.apache.pinot.tools.Command; import org.kohsuke.args4j.Option; import org.slf4j.Logger; @@ -42,8 +40,8 @@ import org.slf4j.LoggerFactory; * Class for command to stream Avro data into Kafka. */ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand implements Command { + public static final String DEFAULT_KAFKA_BROKER = "localhost:19092"; private static final Logger LOGGER = LoggerFactory.getLogger(StreamAvroIntoKafkaCommand.class); - @Option(name = "-avroFile", required = true, metaVar = "<String>", usage = "Avro file to stream.") private String _avroFile = null; @@ -51,7 +49,7 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme private boolean _help = false; @Option(name = "-kafkaBrokerList", required = false, metaVar = "<String>", usage = "Kafka broker list.") - private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER; + private String _kafkaBrokerList = DEFAULT_KAFKA_BROKER; @Option(name = "-kafkaTopic", required = true, metaVar = "<String>", usage = "Kafka topic to stream into.") private String _kafkaTopic = null; @@ -104,8 +102,13 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("request.required.acks", "1"); - ProducerConfig producerConfig = new ProducerConfig(properties); - Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig); + String producerClass = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer"; + StreamDataProducer streamDataProducer; + try { + streamDataProducer = StreamDataProvider.getStreamDataProducer(producerClass, properties); + } catch (Exception e) { + throw new RuntimeException("Failed to get StreamDataProducer - " + producerClass, e); + } try { // Open the Avro file DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(new File(_avroFile)); @@ -115,11 +118,7 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme // Write the message to Kafka String recordJson = genericRecord.toString(); byte[] bytes = recordJson.getBytes("utf-8"); - KeyedMessage<byte[], byte[]> data = - new KeyedMessage<byte[], byte[]>(_kafkaTopic, Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), - bytes); - - producer.send(data); + streamDataProducer.produce(_kafkaTopic, Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes); // Sleep between messages if (sleepRequired) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java index 7c238bf..9a189cf 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java @@ -27,9 +27,6 @@ import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -37,7 +34,8 @@ import org.apache.pinot.common.data.FieldSpec; import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.data.TimeFieldSpec; import org.apache.pinot.common.utils.JsonUtils; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; +import org.apache.pinot.core.realtime.stream.StreamDataProducer; +import org.apache.pinot.core.realtime.stream.StreamDataProvider; import org.apache.pinot.tools.Quickstart; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,28 +43,29 @@ import org.slf4j.LoggerFactory; public class AirlineDataStream { private static final Logger logger = LoggerFactory.getLogger(AirlineDataStream.class); + private static final String DEFAULT_KAFKA_BROKER = "localhost:19092"; Schema pinotSchema; File avroFile; DataFileStream<GenericRecord> avroDataStream; Integer currentTimeValue = 16102; boolean keepIndexing = true; - private Producer<String, byte[]> producer; ExecutorService service; int counter = 0; + private StreamDataProducer producer; public AirlineDataStream(Schema pinotSchema, File avroFile) - throws FileNotFoundException, IOException { + throws Exception { this.pinotSchema = pinotSchema; this.avroFile = avroFile; createStream(); Properties properties = new Properties(); - properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER); + properties.put("metadata.broker.list", DEFAULT_KAFKA_BROKER); properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("request.required.acks", "1"); - ProducerConfig producerConfig = new ProducerConfig(properties); - producer = new Producer<String, byte[]>(producerConfig); + producer = StreamDataProvider.getStreamDataProducer("org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer", properties); + service = Executors.newFixedThreadPool(1); Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 3000 events *****"); @@ -97,9 +96,7 @@ public class AirlineDataStream { avroDataStream = null; return; } - KeyedMessage<String, byte[]> data = - new KeyedMessage<String, byte[]>("airlineStatsEvents", message.toString().getBytes("UTF-8")); - producer.send(data); + producer.produce("airlineStatsEvents", message.toString().getBytes("UTF-8")); } public void run() { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java index 114072f..c4e1539 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java @@ -23,40 +23,40 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.File; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.Properties; import javax.websocket.ClientEndpointConfig; import javax.websocket.Endpoint; import javax.websocket.EndpointConfig; import javax.websocket.MessageHandler; import javax.websocket.Session; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.utils.JsonUtils; -import org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder; -import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; +import org.apache.pinot.core.realtime.stream.StreamMessageDecoder; +import org.apache.pinot.core.realtime.stream.StreamDataProducer; +import org.apache.pinot.core.realtime.stream.StreamDataProvider; import org.glassfish.tyrus.client.ClientManager; public class MeetupRsvpStream { + + private static final String DEFAULT_KAFKA_BROKER = "localhost:19092"; + private Schema schema; - private Producer<String, byte[]> producer; + private StreamDataProducer producer; private boolean keepPublishing = true; private ClientManager client; public MeetupRsvpStream(File schemaFile) - throws IOException, URISyntaxException { + throws Exception { schema = Schema.fromFile(schemaFile); Properties properties = new Properties(); - properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER); + properties.put("metadata.broker.list", DEFAULT_KAFKA_BROKER); properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("request.required.acks", "1"); - ProducerConfig producerConfig = new ProducerConfig(properties); - producer = new Producer<String, byte[]>(producerConfig); + producer = StreamDataProvider + .getStreamDataProducer("org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer", properties); } public void stopPublishing() { @@ -68,7 +68,8 @@ public class MeetupRsvpStream { try { final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build(); - final KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder(); + final StreamMessageDecoder decoder = + (StreamMessageDecoder) Class.forName("org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder").newInstance(); decoder.init(null, schema, null); client = ClientManager.createClient(); client.connectToServer(new Endpoint() { @@ -108,9 +109,7 @@ public class MeetupRsvpStream { extracted.put("rsvp_count", 1); if (keepPublishing) { - KeyedMessage<String, byte[]> data = - new KeyedMessage<String, byte[]>("meetupRSVPEvents", extracted.toString().getBytes("UTF-8")); - producer.send(data); + producer.produce("meetupRSVPEvents", extracted.toString().getBytes("UTF-8")); } } catch (Exception e) { //LOGGER.error("error processing raw event ", e); diff --git a/pom.xml b/pom.xml index a734f94..0d00729 100644 --- a/pom.xml +++ b/pom.xml @@ -141,8 +141,7 @@ kafka dependency is still explicitly defined in pinot-integration-tests, pinot-tools and pinot-perf pom files. To change kafka connector dependency, we only need to update this version number config. TODO: figure out a way to inject kafka dependency instead of explicitly setting the kafka module dependency --> - <kafka.version>0.9</kafka.version> - <kafka.scala.version>2.10</kafka.scala.version> + <kafka.lib.version>0.9</kafka.lib.version> </properties> <profiles> @@ -929,12 +928,6 @@ <artifactId>jopt-simple</artifactId> <version>4.6</version> </dependency> - <!-- kafka_2.10 & larray use scala-library --> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>2.10.5</version> - </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org