This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch refactor_kafka
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3226348eab493d829215a8091e40a8caf7c0944d
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Wed Jul 17 17:28:20 2019 -0700

    Refactor pinot-connectors to break the dependencies from kafka 0.9
---
 pinot-common/pom.xml                               |   4 -
 pinot-connectors/pinot-connector-kafka-0.9/pom.xml |   7 +-
 .../impl/kafka/server/KafkaDataProducer.java       |  57 +++++++++
 .../KafkaDataServerStartable.java}                 | 136 +++++++++++++--------
 pinot-core/pom.xml                                 |   4 -
 .../core/realtime/stream/StreamDataProducer.java   |  35 ++++++
 .../core/realtime/stream/StreamDataProvider.java   |  45 +++++++
 .../realtime/stream/StreamDataServerStartable.java |  33 +++++
 .../function/FunctionExpressionEvaluatorTest.java  |   1 -
 pinot-integration-tests/pom.xml                    |   2 +-
 .../tests/BaseClusterIntegrationTest.java          |  25 ++--
 .../pinot/integration/tests/CommonKafkaUtils.java  | 102 ++++++++++++++++
 .../ControllerPeriodicTasksIntegrationTests.java   |   3 +-
 .../tests/HybridClusterIntegrationTest.java        |   3 +-
 ...ridClusterIntegrationTestCommandLineRunner.java |  13 +-
 .../tests/RealtimeClusterIntegrationTest.java      |   3 +-
 pinot-perf/pom.xml                                 |   2 +-
 .../perf/BenchmarkRealtimeConsumptionSpeed.java    |  13 +-
 .../org/apache/pinot/perf/RealtimeStressTest.java  |  16 +--
 pinot-tools/pom.xml                                |   3 +-
 .../org/apache/pinot/tools/HybridQuickstart.java   |  40 +++---
 .../org/apache/pinot/tools/RealtimeQuickStart.java |  35 ++++--
 .../tools/admin/command/StartKafkaCommand.java     |  20 ++-
 .../admin/command/StreamAvroIntoKafkaCommand.java  |  25 ++--
 .../pinot/tools/streams/AirlineDataStream.java     |  21 ++--
 .../pinot/tools/streams/MeetupRsvpStream.java      |  29 +++--
 pom.xml                                            |   9 +-
 27 files changed, 503 insertions(+), 183 deletions(-)

diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 2e27ac8..41c9b39 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -198,10 +198,6 @@
       <artifactId>jopt-simple</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
       <groupId>nl.jqno.equalsverifier</groupId>
       <artifactId>equalsverifier</artifactId>
       <scope>test</scope>
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml 
b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index e8f6c93..0450102 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -42,7 +42,7 @@
     <!-- Kafka  -->
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_${kafka.scala.version}</artifactId>
+      <artifactId>kafka_2.10</artifactId>
       <version>${kafka.lib.version}</version>
       <exclusions>
         <exclusion>
@@ -63,5 +63,10 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.10.5</version>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
new file mode 100644
index 0000000..0eb4ac6
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka.server;
+
+import java.util.Properties;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+
+
+public class KafkaDataProducer implements StreamDataProducer {
+  Producer<byte[], byte[]> producer;
+
+  @Override
+  public void init(Properties props) {
+    ProducerConfig producerConfig = new ProducerConfig(props);
+    this.producer = new Producer(producerConfig);
+  }
+
+  @Override
+  public void produce(String topic, byte[] payload) {
+    KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, payload);
+    this.produce(data);
+  }
+
+  @Override
+  public void produce(String topic, byte[] key, byte[] payload) {
+    KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, key, 
payload);
+    this.produce(data);
+  }
+
+  public void produce(KeyedMessage message) {
+    producer.send(message);
+  }
+
+  @Override
+  public void close() {
+    producer.close();
+  }
+}
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
similarity index 63%
rename from 
pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
rename to 
pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
index 5f1de99..dcd44c0 100644
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.impl.kafka;
+package org.apache.pinot.core.realtime.impl.kafka.server;
 
 import java.io.File;
 import java.security.Permission;
@@ -29,17 +29,25 @@ import kafka.server.KafkaServerStartable;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 
 
-/**
- * Utilities to start Kafka during unit tests.
- *
- */
-public class KafkaStarterUtils {
+public class KafkaDataServerStartable implements StreamDataServerStartable {
   public static final int DEFAULT_KAFKA_PORT = 19092;
   public static final int DEFAULT_BROKER_ID = 0;
   public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR + 
"/kafka";
   public static final String DEFAULT_KAFKA_BROKER = "localhost:" + 
DEFAULT_KAFKA_PORT;
+  private static final String PORT = "port";
+  private static final String BROKER_ID = "brokerId";
+  private static final String ZK_STR = "zkStr";
+  private static final String LOG_DIR_PATH = "logDirPath";
+  private static final int DEFAULT_TOPIC_PARTITION = 1;
+
+  private KafkaServerStartable serverStartable;
+  private String zkStr;
+  private int port;
+  private int brokerId;
+  private String logDirPath;
 
   public static Properties getDefaultKafkaConfiguration() {
     final Properties configuration = new Properties();
@@ -50,51 +58,34 @@ public class KafkaStarterUtils {
     // Set host name
     configureHostName(configuration, "localhost");
 
+    configuration.put(PORT, DEFAULT_KAFKA_PORT);
+    configuration.put(BROKER_ID, DEFAULT_BROKER_ID);
+    configuration.put(ZK_STR, DEFAULT_ZK_STR);
+    configuration.put(LOG_DIR_PATH, "/tmp/kafka-" + 
Double.toHexString(Math.random()));
+
     return configuration;
   }
 
-  public static List<KafkaServerStartable> startServers(final int brokerCount, 
final int port, final String zkStr,
+  public static List<StreamDataServerStartable> startServers(final int 
brokerCount, final int port, final String zkStr,
       final Properties configuration) {
-    List<KafkaServerStartable> startables = new ArrayList<>(brokerCount);
+    List<StreamDataServerStartable> startables = new ArrayList<>(brokerCount);
 
     for (int i = 0; i < brokerCount; i++) {
-      startables.add(startServer(port + i, i, zkStr, "/tmp/kafka-" + 
Double.toHexString(Math.random()), configuration));
+      startables.add(startServer(port + i, i, zkStr, configuration));
     }
-
     return startables;
   }
 
-  public static KafkaServerStartable startServer(final int port, final int 
brokerId, final String zkStr,
+  public static StreamDataServerStartable startServer(final int port, final 
int brokerId, final String zkStr,
       final Properties configuration) {
-    return startServer(port, brokerId, zkStr, "/tmp/kafka-" + 
Double.toHexString(Math.random()), configuration);
-  }
-
-  public static KafkaServerStartable startServer(final int port, final int 
brokerId, final String zkStr,
-      final String logDirPath, final Properties configuration) {
-    // Create the ZK nodes for Kafka, if needed
-    int indexOfFirstSlash = zkStr.indexOf('/');
-    if (indexOfFirstSlash != -1) {
-      String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
-      String zkNodePath = zkStr.substring(indexOfFirstSlash);
-      ZkClient client = new ZkClient(bareZkUrl);
-      client.createPersistent(zkNodePath, true);
-      client.close();
-    }
-
-    File logDir = new File(logDirPath);
-    logDir.mkdirs();
-
-    configureKafkaPort(configuration, port);
-    configureZkConnectionString(configuration, zkStr);
-    configureBrokerId(configuration, brokerId);
-    configureKafkaLogDirectory(configuration, logDir);
-    configuration.put("zookeeper.session.timeout.ms", "60000");
-    KafkaConfig config = new KafkaConfig(configuration);
-
-    KafkaServerStartable serverStartable = new KafkaServerStartable(config);
-    serverStartable.startup();
-
-    return serverStartable;
+    final KafkaDataServerStartable kafkaDataServerStartable = new 
KafkaDataServerStartable();
+    configuration.put(PORT, port);
+    configuration.put(BROKER_ID, brokerId);
+    configuration.put(ZK_STR, zkStr);
+    configuration.put(LOG_DIR_PATH, "/tmp/kafka-" + 
Double.toHexString(Math.random()));
+    kafkaDataServerStartable.init(configuration);
+    kafkaDataServerStartable.start();
+    return kafkaDataServerStartable;
   }
 
   public static void configureSegmentSizeBytes(Properties properties, int 
segmentSize) {
@@ -129,17 +120,6 @@ public class KafkaStarterUtils {
     configuration.put("host.name", hostName);
   }
 
-  public static void stopServer(KafkaServerStartable serverStartable) {
-    serverStartable.shutdown();
-    FileUtils.deleteQuietly(new 
File(serverStartable.serverConfig().logDirs().apply(0)));
-  }
-
-  public static void createTopic(String kafkaTopic, String zkStr, int 
partitionCount) {
-    invokeTopicCommand(
-        new String[]{"--create", "--zookeeper", zkStr, "--replication-factor", 
"1", "--partitions", Integer.toString(
-            partitionCount), "--topic", kafkaTopic});
-  }
-
   private static void invokeTopicCommand(String[] args) {
     // jfim: Use Java security to trap System.exit in Kafka 0.9's TopicCommand
     System.setSecurityManager(new SecurityManager() {
@@ -168,4 +148,58 @@ public class KafkaStarterUtils {
   public static void deleteTopic(String kafkaTopic, String zkStr) {
     invokeTopicCommand(new String[]{"--delete", "--zookeeper", zkStr, 
"--topic", kafkaTopic});
   }
+
+  public void init(Properties props) {
+    if (props == null) {
+      props = getDefaultKafkaConfiguration();
+    }
+    port = Integer.parseInt(props.getProperty(PORT));
+    brokerId = Integer.parseInt(props.getProperty(BROKER_ID));
+    zkStr = props.getProperty(ZK_STR);
+    logDirPath = props.getProperty(LOG_DIR_PATH);
+
+    // Create the ZK nodes for Kafka, if needed
+    int indexOfFirstSlash = zkStr.indexOf('/');
+    if (indexOfFirstSlash != -1) {
+      String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
+      String zkNodePath = zkStr.substring(indexOfFirstSlash);
+      ZkClient client = new ZkClient(bareZkUrl);
+      client.createPersistent(zkNodePath, true);
+      client.close();
+    }
+
+    File logDir = new File(logDirPath);
+    logDir.mkdirs();
+
+    configureKafkaPort(props, port);
+    configureZkConnectionString(props, zkStr);
+    configureBrokerId(props, brokerId);
+    configureKafkaLogDirectory(props, logDir);
+    props.put("zookeeper.session.timeout.ms", "60000");
+    KafkaConfig config = new KafkaConfig(props);
+
+    serverStartable = new KafkaServerStartable(config);
+  }
+
+  @Override
+  public void start() {
+    serverStartable.startup();
+  }
+
+  @Override
+  public void stop() {
+    serverStartable.shutdown();
+    FileUtils.deleteQuietly(new 
File(serverStartable.serverConfig().logDirs().apply(0)));
+  }
+
+  @Override
+  public void createTopic(String topic, Properties props) {
+    int partitionCount = DEFAULT_TOPIC_PARTITION;
+    if (props.containsKey("partition")) {
+      partitionCount = Integer.parseInt(props.getProperty("partition"));
+    }
+    invokeTopicCommand(
+        new String[]{"--create", "--zookeeper", this.zkStr, 
"--replication-factor", "1", "--partitions", Integer.toString(
+            partitionCount), "--topic", topic});
+  }
 }
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 51c9231..d074639 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -161,10 +161,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
       <groupId>net.sf.jopt-simple</groupId>
       <artifactId>jopt-simple</artifactId>
     </dependency>
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
new file mode 100644
index 0000000..53275a0
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataServerStartable is the interface for stream data sources. E.g. 
KafkaServerStartable, KinesisServerStarable.
+ */
+public interface StreamDataProducer {
+  void init(Properties props);
+
+  void produce(String topic, byte[] payload);
+
+  void produce(String topic, byte[] key, byte[] payload);
+
+  void close();
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
new file mode 100644
index 0000000..aec83a3
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+public class StreamDataProvider {
+  public static StreamDataServerStartable getServerDataStartable(String clazz, 
Properties props)
+      throws Exception {
+    final StreamDataServerStartable streamDataServerStartable =
+        (StreamDataServerStartable) Class.forName(clazz).newInstance();
+    streamDataServerStartable.init(props);
+    return streamDataServerStartable;
+  }
+
+  public static StreamDataProducer getStreamDataProducer(String clazz, 
Properties props)
+      throws Exception {
+
+    final StreamDataProducer streamDataProducer = (StreamDataProducer) 
Class.forName(clazz).newInstance();
+    streamDataProducer.init(props);
+    return streamDataProducer;
+  }
+
+  public static StreamDataServerStartable getServerDataStartable(String clazz)
+      throws Exception {
+    return getServerDataStartable(clazz, null);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
new file mode 100644
index 0000000..f53e201
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataServerStartable is the interface for stream data sources. E.g. 
KafkaDataServerStartable, KinesisDataServerStarable.
+ */
+public interface StreamDataServerStartable {
+  void init(Properties props);
+  void start();
+  void stop();
+
+  void createTopic(String topic, Properties props);
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
index b6b41ff..dca78da 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
@@ -26,7 +26,6 @@ import org.joda.time.MutableDateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import scala.collection.mutable.StringBuilder;
 
 
 public class FunctionExpressionEvaluatorTest {
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index b58647f..b54a7d8 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -191,7 +191,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+      <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 67fabc6..faffe9b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ConnectionFactory;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
@@ -40,7 +39,7 @@ import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 
@@ -73,12 +72,13 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   protected final File _avroDir = new File(_tempDir, "avroDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
-  protected List<KafkaServerStartable> _kafkaStarters;
+  protected List<StreamDataServerStartable> _kafkaStarters;
 
   private org.apache.pinot.client.Connection _pinotConnection;
   private Connection _h2Connection;
   private QueryGenerator _queryGenerator;
 
+
   /**
    * The following getters can be overridden to change default settings.
    */
@@ -318,8 +318,9 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
       @Override
       public void run() {
         try {
-          ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
-              getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn());
+          ClusterIntegrationTestUtils
+              .pushAvroIntoKafka(avroFiles, 
CommonKafkaUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
+                  getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn());
         } catch (Exception e) {
           // Ignored
         }
@@ -328,15 +329,17 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   }
 
   protected void startKafka() {
-    _kafkaStarters = KafkaStarterUtils
-        .startServers(getNumKafkaBrokers(), 
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
-            KafkaStarterUtils.getDefaultKafkaConfiguration());
-    KafkaStarterUtils.createTopic(getKafkaTopic(), 
KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
+
+    _kafkaStarters =
+        CommonKafkaUtils.startServers(getNumKafkaBrokers(), 
CommonKafkaUtils.DEFAULT_KAFKA_PORT, CommonKafkaUtils.DEFAULT_ZK_STR,
+            CommonKafkaUtils.getDefaultKafkaConfiguration());
+    _kafkaStarters.get(0)
+        .createTopic(getKafkaTopic(), 
CommonKafkaUtils.getTopicCreationProps(getNumKafkaPartitions()));
   }
 
   protected void stopKafka() {
-    for (KafkaServerStartable kafkaStarter : _kafkaStarters) {
-      KafkaStarterUtils.stopServer(kafkaStarter);
+    for (StreamDataServerStartable kafkaStarter : _kafkaStarters) {
+      kafkaStarter.stop();
     }
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommonKafkaUtils.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommonKafkaUtils.java
new file mode 100644
index 0000000..7c6ed83
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommonKafkaUtils.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+
+
+public class CommonKafkaUtils {
+  public static final int DEFAULT_BROKER_ID = 0;
+  public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR + 
"/kafka";
+  public static final String PORT = "port";
+  public static final String BROKER_ID = "brokerId";
+  public static final String ZK_STR = "zkStr";
+  public static final String LOG_DIR_PATH = "logDirPath";
+  public static int DEFAULT_KAFKA_PORT = 19092;
+  public static final String DEFAULT_KAFKA_BROKER = "localhost:" + 
DEFAULT_KAFKA_PORT;
+  public static final String KAFKA_09_SERVER_STARTABLE_CLASS_NAME = 
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+
+  public static Properties getDefaultKafkaConfiguration() {
+    final Properties configuration = new Properties();
+
+    // Enable topic deletion by default for integration tests
+    configureTopicDeletion(configuration, true);
+
+    // Set host name
+    configureHostName(configuration, "localhost");
+
+    configuration.put(PORT, DEFAULT_KAFKA_PORT);
+    configuration.put(BROKER_ID, DEFAULT_BROKER_ID);
+    configuration.put(ZK_STR, DEFAULT_ZK_STR);
+    configuration.put(LOG_DIR_PATH, "/tmp/kafka-" + 
Double.toHexString(Math.random()));
+
+    return configuration;
+  }
+
+  public static void configureTopicDeletion(Properties configuration, boolean 
topicDeletionEnabled) {
+    configuration.put("delete.topic.enable", 
Boolean.toString(topicDeletionEnabled));
+  }
+
+  public static void configureHostName(Properties configuration, String 
hostName) {
+    configuration.put("host.name", hostName);
+  }
+
+  public static Properties getTopicCreationProps(int numKafkaPartitions) {
+    Properties topicProps = new Properties();
+    topicProps.put("partition", numKafkaPartitions);
+    return topicProps;
+  }
+
+
+  public static List<StreamDataServerStartable> startServers(final int 
brokerCount, final int port, final String zkStr,
+      final Properties configuration) {
+    List<StreamDataServerStartable> startables = new ArrayList<>(brokerCount);
+
+    for (int i = 0; i < brokerCount; i++) {
+      startables.add(startServer(port + i, i, zkStr, configuration));
+    }
+    return startables;
+  }
+
+  public static StreamDataServerStartable startServer(final int port, final 
int brokerId, final String zkStr,
+      final Properties configuration) {
+    StreamDataServerStartable kafkaStarter;
+
+    String kafkaClazz = KAFKA_09_SERVER_STARTABLE_CLASS_NAME;
+    try {
+      kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+
+    configuration.put(CommonKafkaUtils.PORT, port);
+    configuration.put(CommonKafkaUtils.BROKER_ID, brokerId);
+    configuration.put(CommonKafkaUtils.ZK_STR, zkStr);
+    configuration.put(CommonKafkaUtils.LOG_DIR_PATH, "/tmp/kafka-" + 
Double.toHexString(Math.random()));
+
+    kafkaStarter.init(configuration);
+    kafkaStarter.start();
+    return kafkaStarter;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index fc14789..64fe60f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -51,7 +51,6 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.ITestContext;
@@ -212,7 +211,7 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
     Assert.assertNotNull(outgoingTimeUnit);
     String timeType = outgoingTimeUnit.toString();
 
-    addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, 
KafkaStarterUtils.DEFAULT_ZK_STR, topic,
+    addRealtimeTable(table, useLlc(), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, 
CommonKafkaUtils.DEFAULT_ZK_STR, topic,
         getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, 
schemaName, TENANT_NAME, TENANT_NAME,
         getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), 
getBloomFilterIndexColumns(), getRawIndexColumns(),
         getTaskConfig(), getStreamConsumerFactoryClassName());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index 0983d4e..004cdf2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -33,7 +33,6 @@ import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -132,7 +131,7 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     Assert.assertNotNull(outgoingTimeUnit);
     String timeType = outgoingTimeUnit.toString();
 
-    addHybridTable(getTableName(), useLlc(), 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
+    addHybridTable(getTableName(), useLlc(), 
CommonKafkaUtils.DEFAULT_KAFKA_BROKER, CommonKafkaUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, 
timeColumnName, timeType, schemaName, TENANT_NAME,
         TENANT_NAME, getLoadMode(), getSortedColumn(), 
getInvertedIndexColumns(), getBloomFilterIndexColumns(),
         getRawIndexColumns(), getTaskConfig(), 
getStreamConsumerFactoryClassName(), getSegmentPartitionConfig());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index be24be8..970265c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -34,13 +34,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
-import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.broker.requesthandler.PinotQueryRequest;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.query.comparison.QueryComparison;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -188,7 +187,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
     private List<File> _realtimeAvroFiles;
     private File _queryFile;
     private File _responseFile;
-    private KafkaServerStartable _kafkaStarter;
+    private StreamDataServerStartable _kafkaStarter;
     private long _countStarResult;
 
     public CustomHybridClusterIntegrationTest() {
@@ -258,11 +257,11 @@ public class 
HybridClusterIntegrationTestCommandLineRunner {
 
       // Start Zk and Kafka
       startZk(ZK_PORT);
-      _kafkaStarter = KafkaStarterUtils.startServer(KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID, KAFKA_ZK_STR,
-          KafkaStarterUtils.getDefaultKafkaConfiguration());
+      _kafkaStarter = CommonKafkaUtils.startServer(KAFKA_PORT, 
CommonKafkaUtils.DEFAULT_BROKER_ID, KAFKA_ZK_STR,
+          CommonKafkaUtils.getDefaultKafkaConfiguration());
 
       // Create Kafka topic
-      KafkaStarterUtils.createTopic(getKafkaTopic(), KAFKA_ZK_STR, 
getNumKafkaPartitions());
+      _kafkaStarter.createTopic(getKafkaTopic(), 
CommonKafkaUtils.getTopicCreationProps(getNumKafkaPartitions()));
 
       // Start the Pinot cluster
       ControllerConf config = getDefaultControllerConfiguration();
@@ -379,7 +378,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       stopServer();
       stopBroker();
       stopController();
-      KafkaStarterUtils.stopServer(_kafkaStarter);
+      _kafkaStarter.stop();
       stopZk();
 
       FileUtils.deleteDirectory(_tempDir);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 8b4baeb..cf833ce 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -90,7 +89,7 @@ public class RealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTestSe
     Assert.assertNotNull(outgoingTimeUnit);
     String timeType = outgoingTimeUnit.toString();
 
-    addRealtimeTable(getTableName(), useLlc(), 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
+    addRealtimeTable(getTableName(), useLlc(), 
CommonKafkaUtils.DEFAULT_KAFKA_BROKER, CommonKafkaUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, 
timeColumnName, timeType, schemaName,
         getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(),
         getInvertedIndexColumns(), getBloomFilterIndexColumns(), 
getRawIndexColumns(), getTaskConfig(),
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 74bb09f..2a0f5ed 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -52,7 +52,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+      <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
index 8bbb4d5..11535aa 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
@@ -27,8 +27,9 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import kafka.server.KafkaServerStartable;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.CommonKafkaUtils;
 import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
 import org.apache.pinot.util.TestUtils;
 
@@ -58,12 +59,12 @@ public class BenchmarkRealtimeConsumptionSpeed extends 
RealtimeClusterIntegratio
       throws Exception {
     // Start ZK and Kafka
     startZk();
-    KafkaServerStartable kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
+    StreamDataServerStartable kafkaStarter = CommonKafkaUtils
+        .startServer(CommonKafkaUtils.DEFAULT_KAFKA_PORT, 
CommonKafkaUtils.DEFAULT_BROKER_ID,
+            CommonKafkaUtils.DEFAULT_ZK_STR, 
CommonKafkaUtils.getDefaultKafkaConfiguration());
 
     // Create Kafka topic
-    KafkaStarterUtils.createTopic(getKafkaTopic(), 
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+    kafkaStarter.createTopic(getKafkaTopic(), 
CommonKafkaUtils.getTopicCreationProps(10));
 
     // Unpack data (needed to get the Avro schema)
     TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
@@ -93,7 +94,7 @@ public class BenchmarkRealtimeConsumptionSpeed extends 
RealtimeClusterIntegratio
       public void run() {
         try {
           ClusterIntegrationTestUtils
-              .pushRandomAvroIntoKafka(avroFiles.get(0), 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
+              .pushRandomAvroIntoKafka(avroFiles.get(0), 
CommonKafkaUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
                   ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), 
getKafkaMessageHeader(), getPartitionColumn());
         } catch (Exception e) {
           // Ignored
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
index 71d28e7..7ffb16a 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import kafka.server.KafkaServerStartable;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.CommonKafkaUtils;
 import org.apache.pinot.integration.tests.OfflineClusterIntegrationTest;
 import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
 import org.apache.pinot.util.TestUtils;
@@ -59,12 +59,12 @@ public class RealtimeStressTest extends 
RealtimeClusterIntegrationTest {
       throws Exception {
     // Start ZK and Kafka
     startZk();
-    KafkaServerStartable kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
+    StreamDataServerStartable kafkaStarter = CommonKafkaUtils
+        .startServer(CommonKafkaUtils.DEFAULT_KAFKA_PORT, 
CommonKafkaUtils.DEFAULT_BROKER_ID,
+            CommonKafkaUtils.DEFAULT_ZK_STR, 
CommonKafkaUtils.getDefaultKafkaConfiguration());
 
     // Create Kafka topic
-    KafkaStarterUtils.createTopic(getKafkaTopic(), 
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+    kafkaStarter.createTopic(getKafkaTopic(), 
CommonKafkaUtils.getTopicCreationProps(10));
 
     // Unpack data (needed to get the Avro schema)
     TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
@@ -93,7 +93,7 @@ public class RealtimeStressTest extends 
RealtimeClusterIntegrationTest {
 
     // Generate ROW_COUNT rows and write them into Kafka
     ClusterIntegrationTestUtils
-        .pushRandomAvroIntoKafka(avroFiles.get(0), 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT,
+        .pushRandomAvroIntoKafka(avroFiles.get(0), 
CommonKafkaUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT,
             getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn());
     rowsWritten += ROW_COUNT;
 
@@ -115,7 +115,7 @@ public class RealtimeStressTest extends 
RealtimeClusterIntegrationTest {
       // Write more rows if needed
       if (rowsWritten - pinotRecordCount < MIN_ROW_COUNT) {
         ClusterIntegrationTestUtils
-            .pushRandomAvroIntoKafka(avroFiles.get(0), 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
+            .pushRandomAvroIntoKafka(avroFiles.get(0), 
CommonKafkaUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
                 ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), 
getKafkaMessageHeader(), getPartitionColumn());
         rowsWritten += ROW_COUNT;
       }
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 6e18295..cf9f507 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -56,8 +56,9 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+      <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
       <version>${project.version}</version>
+      <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>commons-cli</groupId>
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index fe004d3..22f7476 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -23,12 +23,13 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import kafka.server.KafkaServerStartable;
+import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.core.data.readers.FileFormat;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.AirlineDataStream;
@@ -37,16 +38,21 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
 
 
 public class HybridQuickstart {
-  private HybridQuickstart() {
-  }
-
   private File _offlineQuickStartDataDir;
   private File _realtimeQuickStartDataDir;
-  private KafkaServerStartable _kafkaStarter;
+  private StreamDataServerStartable _kafkaStarter;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private File _schemaFile;
   private File _dataFile;
 
+  private HybridQuickstart() {
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    new HybridQuickstart().execute();
+  }
+
   private QuickstartTableRequest prepareOfflineTableRequest()
       throws IOException {
     _offlineQuickStartDataDir = new File("quickStartData" + 
System.currentTimeMillis());
@@ -94,11 +100,16 @@ public class HybridQuickstart {
   private void startKafka() {
     _zookeeperInstance = ZkStarter.startLocalZkServer();
 
-    _kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
-
-    KafkaStarterUtils.createTopic("airlineStatsEvents", 
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+    String kafkaClazz = 
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+    _kafkaStarter.start();
+    Properties topicProps = new Properties();
+    topicProps.put("partition", 10);
+    _kafkaStarter.createTopic("airlineStatsEvents", topicProps);
   }
 
   public void execute()
@@ -153,7 +164,7 @@ public class HybridQuickstart {
           stream.shutdown();
           Thread.sleep(2000);
           runner.stop();
-          KafkaStarterUtils.stopServer(_kafkaStarter);
+          _kafkaStarter.stop();
           ZkStarter.stopLocalZkServer(_zookeeperInstance);
           FileUtils.deleteDirectory(_offlineQuickStartDataDir);
           FileUtils.deleteDirectory(_realtimeQuickStartDataDir);
@@ -163,9 +174,4 @@ public class HybridQuickstart {
       }
     });
   }
-
-  public static void main(String[] args)
-      throws Exception {
-    new HybridQuickstart().execute();
-  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 8df1fbb..44661fa 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -22,10 +22,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.net.URL;
-import kafka.server.KafkaServerStartable;
+import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.MeetupRsvpStream;
@@ -35,9 +36,16 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
 
 
 public class RealtimeQuickStart {
+  private StreamDataServerStartable _kafkaStarter;
+
   private RealtimeQuickStart() {
   }
 
+  public static void main(String[] args)
+      throws Exception {
+    new RealtimeQuickStart().execute();
+  }
+
   public void execute()
       throws Exception {
     final File quickStartDataDir = new File("quickStartData" + 
System.currentTimeMillis());
@@ -64,10 +72,18 @@ public class RealtimeQuickStart {
 
     printStatus(Color.CYAN, "***** Starting Kafka *****");
     final ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
-    final KafkaServerStartable kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
-    KafkaStarterUtils.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+
+    String kafkaClazz = 
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+    _kafkaStarter.start();
+    Properties topicProps = new Properties();
+    topicProps.put("partition", 10);
+    _kafkaStarter.createTopic("meetupRSVPEvents", topicProps);
+
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
     runner.startAll();
     printStatus(Color.CYAN, "***** Adding meetupRSVP schema *****");
@@ -87,7 +103,7 @@ public class RealtimeQuickStart {
           printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
           meetupRSVPProvider.stopPublishing();
           runner.stop();
-          KafkaStarterUtils.stopServer(kafkaStarter);
+          _kafkaStarter.stop();
           ZkStarter.stopLocalZkServer(zookeeperInstance);
           FileUtils.deleteDirectory(quickStartDataDir);
         } catch (Exception e) {
@@ -130,9 +146,4 @@ public class RealtimeQuickStart {
 
     printStatus(Color.GREEN, "You can always go to 
http://localhost:9000/query/ to play around in the query console");
   }
-
-  public static void main(String[] args)
-      throws Exception {
-    new RealtimeQuickStart().execute();
-  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
index 38f042c..e1e7d40 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
@@ -20,7 +20,8 @@ package org.apache.pinot.tools.admin.command;
 
 import java.io.File;
 import java.io.IOException;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -32,17 +33,22 @@ import org.slf4j.LoggerFactory;
  */
 public class StartKafkaCommand extends AbstractBaseAdminCommand implements 
Command {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StartKafkaCommand.class);
+  private static final int DEFAULT_KAFKA_PORT = 19092;
+  private static final int DEFAULT_BROKER_ID = 0;
+
+
   @Option(name = "-port", required = false, metaVar = "<int>", usage = "Port 
to start Kafka server on.")
-  private int _port = KafkaStarterUtils.DEFAULT_KAFKA_PORT;
+  private int _port = DEFAULT_KAFKA_PORT;
 
   @Option(name = "-help", required = false, help = true, aliases = {"-h", 
"--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
   @Option(name = "-brokerId", required = false, metaVar = "<int>", usage = 
"Kafka broker ID.")
-  private int _brokerId = KafkaStarterUtils.DEFAULT_BROKER_ID;
+  private int _brokerId = DEFAULT_BROKER_ID;
 
   @Option(name = "-zkAddress", required = false, metaVar = "<string>", usage = 
"Address of Zookeeper.")
   private String _zkAddress = "localhost:2181";
+  private StreamDataServerStartable _kafkaStarter;
 
   @Override
   public boolean getHelp() {
@@ -67,7 +73,13 @@ public class StartKafkaCommand extends 
AbstractBaseAdminCommand implements Comma
   @Override
   public boolean execute()
       throws IOException {
-    KafkaStarterUtils.startServer(_port, _brokerId, _zkAddress, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
+    String kafkaClazz = 
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+    _kafkaStarter.start();
 
     LOGGER.info("Start kafka at localhost:" + _port + " in thread " + 
Thread.currentThread().getName());
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 0a75023..f0ff30e 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -24,14 +24,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.core.util.AvroUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -42,8 +40,8 @@ import org.slf4j.LoggerFactory;
  * Class for command to stream Avro data into Kafka.
  */
 public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand 
implements Command {
+  public static final String DEFAULT_KAFKA_BROKER = "localhost:19092";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamAvroIntoKafkaCommand.class);
-
   @Option(name = "-avroFile", required = true, metaVar = "<String>", usage = 
"Avro file to stream.")
   private String _avroFile = null;
 
@@ -51,7 +49,7 @@ public class StreamAvroIntoKafkaCommand extends 
AbstractBaseAdminCommand impleme
   private boolean _help = false;
 
   @Option(name = "-kafkaBrokerList", required = false, metaVar = "<String>", 
usage = "Kafka broker list.")
-  private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER;
+  private String _kafkaBrokerList = DEFAULT_KAFKA_BROKER;
 
   @Option(name = "-kafkaTopic", required = true, metaVar = "<String>", usage = 
"Kafka topic to stream into.")
   private String _kafkaTopic = null;
@@ -104,8 +102,13 @@ public class StreamAvroIntoKafkaCommand extends 
AbstractBaseAdminCommand impleme
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
 
-    ProducerConfig producerConfig = new ProducerConfig(properties);
-    Producer<byte[], byte[]> producer = new Producer<byte[], 
byte[]>(producerConfig);
+    String producerClass = 
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer";
+    StreamDataProducer streamDataProducer;
+    try {
+      streamDataProducer = 
StreamDataProvider.getStreamDataProducer(producerClass, properties);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get StreamDataProducer - " + 
producerClass, e);
+    }
     try {
       // Open the Avro file
       DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(new 
File(_avroFile));
@@ -115,11 +118,7 @@ public class StreamAvroIntoKafkaCommand extends 
AbstractBaseAdminCommand impleme
         // Write the message to Kafka
         String recordJson = genericRecord.toString();
         byte[] bytes = recordJson.getBytes("utf-8");
-        KeyedMessage<byte[], byte[]> data =
-            new KeyedMessage<byte[], byte[]>(_kafkaTopic, 
Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)),
-                bytes);
-
-        producer.send(data);
+        streamDataProducer.produce(_kafkaTopic, 
Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
 
         // Sleep between messages
         if (sleepRequired) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index 7c238bf..9a189cf 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -27,9 +27,6 @@ import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -37,7 +34,8 @@ import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.data.TimeFieldSpec;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
 import org.apache.pinot.tools.Quickstart;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,28 +43,29 @@ import org.slf4j.LoggerFactory;
 
 public class AirlineDataStream {
   private static final Logger logger = 
LoggerFactory.getLogger(AirlineDataStream.class);
+  private static final String DEFAULT_KAFKA_BROKER = "localhost:19092";
 
   Schema pinotSchema;
   File avroFile;
   DataFileStream<GenericRecord> avroDataStream;
   Integer currentTimeValue = 16102;
   boolean keepIndexing = true;
-  private Producer<String, byte[]> producer;
   ExecutorService service;
   int counter = 0;
+  private StreamDataProducer producer;
 
   public AirlineDataStream(Schema pinotSchema, File avroFile)
-      throws FileNotFoundException, IOException {
+      throws Exception {
     this.pinotSchema = pinotSchema;
     this.avroFile = avroFile;
     createStream();
     Properties properties = new Properties();
-    properties.put("metadata.broker.list", 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+    properties.put("metadata.broker.list", DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
 
-    ProducerConfig producerConfig = new ProducerConfig(properties);
-    producer = new Producer<String, byte[]>(producerConfig);
+    producer = 
StreamDataProvider.getStreamDataProducer("org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer",
 properties);
+
     service = Executors.newFixedThreadPool(1);
     Quickstart.printStatus(Quickstart.Color.YELLOW,
         "***** Offine data has max time as 16101, realtime will start 
consuming from time 16102 and increment time every 3000 events *****");
@@ -97,9 +96,7 @@ public class AirlineDataStream {
       avroDataStream = null;
       return;
     }
-    KeyedMessage<String, byte[]> data =
-        new KeyedMessage<String, byte[]>("airlineStatsEvents", 
message.toString().getBytes("UTF-8"));
-    producer.send(data);
+    producer.produce("airlineStatsEvents", 
message.toString().getBytes("UTF-8"));
   }
 
   public void run() {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index 114072f..c4e1539 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -23,40 +23,40 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Properties;
 import javax.websocket.ClientEndpointConfig;
 import javax.websocket.Endpoint;
 import javax.websocket.EndpointConfig;
 import javax.websocket.MessageHandler;
 import javax.websocket.Session;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
 import org.glassfish.tyrus.client.ClientManager;
 
 
 public class MeetupRsvpStream {
+
+  private static final String DEFAULT_KAFKA_BROKER = "localhost:19092";
+
   private Schema schema;
-  private Producer<String, byte[]> producer;
+  private StreamDataProducer producer;
   private boolean keepPublishing = true;
   private ClientManager client;
 
   public MeetupRsvpStream(File schemaFile)
-      throws IOException, URISyntaxException {
+      throws Exception {
     schema = Schema.fromFile(schemaFile);
 
     Properties properties = new Properties();
-    properties.put("metadata.broker.list", 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+    properties.put("metadata.broker.list", DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
 
-    ProducerConfig producerConfig = new ProducerConfig(properties);
-    producer = new Producer<String, byte[]>(producerConfig);
+    producer = StreamDataProvider
+        
.getStreamDataProducer("org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer",
 properties);
   }
 
   public void stopPublishing() {
@@ -68,7 +68,8 @@ public class MeetupRsvpStream {
     try {
 
       final ClientEndpointConfig cec = 
ClientEndpointConfig.Builder.create().build();
-      final KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder();
+      final StreamMessageDecoder decoder =
+          (StreamMessageDecoder) 
Class.forName("org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder").newInstance();
       decoder.init(null, schema, null);
       client = ClientManager.createClient();
       client.connectToServer(new Endpoint() {
@@ -108,9 +109,7 @@ public class MeetupRsvpStream {
                   extracted.put("rsvp_count", 1);
 
                   if (keepPublishing) {
-                    KeyedMessage<String, byte[]> data =
-                        new KeyedMessage<String, byte[]>("meetupRSVPEvents", 
extracted.toString().getBytes("UTF-8"));
-                    producer.send(data);
+                    producer.produce("meetupRSVPEvents", 
extracted.toString().getBytes("UTF-8"));
                   }
                 } catch (Exception e) {
                   //LOGGER.error("error processing raw event ", e);
diff --git a/pom.xml b/pom.xml
index a734f94..0d00729 100644
--- a/pom.xml
+++ b/pom.xml
@@ -141,8 +141,7 @@
     kafka dependency is still explicitly defined in pinot-integration-tests, 
pinot-tools and pinot-perf pom files.
     To change kafka connector dependency, we only need to update this version 
number config.
     TODO: figure out a way to inject kafka dependency instead of explicitly 
setting the kafka module dependency -->
-    <kafka.version>0.9</kafka.version>
-    <kafka.scala.version>2.10</kafka.scala.version>
+    <kafka.lib.version>0.9</kafka.lib.version>
   </properties>
 
   <profiles>
@@ -929,12 +928,6 @@
         <artifactId>jopt-simple</artifactId>
         <version>4.6</version>
       </dependency>
-      <!-- kafka_2.10 & larray use scala-library -->
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-library</artifactId>
-        <version>2.10.5</version>
-      </dependency>
       <dependency>
         <groupId>commons-lang</groupId>
         <artifactId>commons-lang</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to