This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch kafka_2.0
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3f682d0c454263a8ab5361d751f9f5c9b30e715a
Author: Ananth Packkildurai <apackkildu...@slack-corp.com>
AuthorDate: Wed Jun 12 18:31:22 2019 -0700

    WIP: adding kafka 2 stream provider
---
 .../pinot-connector-kafka-2.0/README.md            |  24 ++++
 pinot-connectors/pinot-connector-kafka-2.0/pom.xml |  67 ++++++++++
 .../impl/kafka2/KafkaConnectionHandler.java        |  61 +++++++++
 .../realtime/impl/kafka2/KafkaConsumerFactory.java |  49 +++++++
 .../realtime/impl/kafka2/KafkaMessageBatch.java    |  65 ++++++++++
 .../impl/kafka2/KafkaPartitionConsumer.java        |  51 ++++++++
 .../kafka2/KafkaPartitionLevelStreamConfig.java    | 144 +++++++++++++++++++++
 .../impl/kafka2/KafkaStreamConfigProperties.java   |  65 ++++++++++
 .../impl/kafka2/KafkaStreamMetadataProvider.java   |  81 ++++++++++++
 .../realtime/impl/kafka2/MessageAndOffset.java     |  49 +++++++
 pinot-connectors/pom.xml                           |   2 +
 11 files changed, 658 insertions(+)

diff --git a/pinot-connectors/pinot-connector-kafka-2.0/README.md 
b/pinot-connectors/pinot-connector-kafka-2.0/README.md
new file mode 100644
index 0000000..cc1950c
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/README.md
@@ -0,0 +1,24 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# Pinot connector for kafka 2.0.x
+
+This is an implementation of the kafka stream for kafka versions 2.0.x The 
version used in this implementation is kafka 2.0.0. This module compiles with 
version 2.0.0 as well, however we have not tested if it runs with the older 
versions.
+A stream plugin for another version of kafka, or another stream, can be added 
in a similar fashion. Refer to documentation on (Pluggable 
Streams)[https://pinot.readthedocs.io/en/latest/pluggable_streams.html] for the 
specfic interfaces to implement.
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml 
b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
new file mode 100644
index 0000000..f351219
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>pinot-connectors</artifactId>
+        <groupId>org.apache.pinot</groupId>
+        <version>0.2.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>pinot-connector-kafka-2.0</artifactId>
+
+    <properties>
+        <pinot.root>${basedir}/../..</pinot.root>
+        <kafka.version>2.0.0</kafka.version>
+    </properties>
+
+    <dependencies>
+
+        <!-- Kafka  -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.sf.jopt-simple</groupId>
+                    <artifactId>jopt-simple</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
new file mode 100644
index 0000000..802062f
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+public abstract class KafkaConnectionHandler {
+
+    protected final KafkaPartitionLevelStreamConfig _config;
+    protected final int _partition;
+    protected final String _topic;
+    protected final Consumer<String, byte[]> _consumer;
+    protected final TopicPartition _topicPartition;
+
+    public KafkaConnectionHandler(StreamConfig streamConfig, int partition) {
+        _config = new KafkaPartitionLevelStreamConfig(streamConfig);
+        _partition = partition;
+        _topic = _config.getKafkaTopicName();
+        Properties consumerProp = new Properties();
+        consumerProp.putAll(streamConfig.getStreamConfigsMap());
+        consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
_config.getBootstrapHosts());
+        consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
+        _consumer = new KafkaConsumer<>(consumerProp);
+        _topicPartition = new TopicPartition(_topic, _partition);
+        _consumer.assign(Collections.singletonList(_topicPartition));
+
+    }
+
+    public void close() throws IOException {
+        _consumer.close();
+    }
+
+
+}
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
new file mode 100644
index 0000000..cc3d8a6
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
+import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+public class KafkaConsumerFactory extends StreamConsumerFactory {
+    @Override
+    public PartitionLevelConsumer createPartitionLevelConsumer(String 
clientId, int partition) {
+        return new KafkaPartitionConsumer(_streamConfig, partition);
+    }
+
+    @Override
+    public StreamLevelConsumer createStreamLevelConsumer(String clientId, 
String tableName, Schema schema, InstanceZKMetadata instanceZKMetadata, 
ServerMetrics serverMetrics) {
+        throw new UnsupportedOperationException("High level consumer not 
supported in kafka 2. Use Kafka partition level consumers");
+    }
+
+    @Override
+    public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
+        return null;
+    }
+
+    @Override
+    public StreamMetadataProvider createStreamMetadataProvider(String 
clientId) {
+        throw new UnsupportedOperationException("High level consumer not 
supported in kafka 2. Use Kafka partition level consumers");
+    }
+}
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
new file mode 100644
index 0000000..22aa683
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaMessageBatch implements MessageBatch<byte[]> {
+
+    private List<MessageAndOffset> messageList = new ArrayList<>();
+
+    public KafkaMessageBatch(Iterable<ConsumerRecord<String, byte[]>> 
iterable) {
+        for (ConsumerRecord<String, byte[]> record : iterable) {
+            messageList.add(new MessageAndOffset(record.value(), 
record.offset()));
+        }
+    }
+
+    @Override
+    public int getMessageCount() {
+        return messageList.size();
+    }
+
+    @Override
+    public byte[] getMessageAtIndex(int index) {
+        return messageList.get(index).getMessage().array();
+    }
+
+    @Override
+    public int getMessageOffsetAtIndex(int index) {
+        return messageList.get(index).getMessage().arrayOffset();
+    }
+
+    @Override
+    public int getMessageLengthAtIndex(int index) {
+        return messageList.get(index).getMessage().array().length;
+    }
+
+    @Override
+    public long getNextStreamMessageOffsetAtIndex(int index) {
+        return messageList.get(index).getNextOffset();
+    }
+
+    public Iterable<MessageAndOffset> iterable() {
+        return messageList;
+    }
+}
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
new file mode 100644
index 0000000..de3295d
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import org.apache.kafka.clients.consumer.*;
+
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class KafkaPartitionConsumer extends KafkaConnectionHandler implements 
PartitionLevelConsumer {
+
+
+    public KafkaPartitionConsumer(StreamConfig streamConfig, int partition) {
+        super(streamConfig, partition);
+    }
+
+    @Override
+    public MessageBatch fetchMessages(long startOffset, long endOffset, int 
timeoutMillis) throws TimeoutException {
+        _consumer.seek(_topicPartition, startOffset);
+
+        ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(null);
+        List<ConsumerRecord<String, byte[]>> records = 
consumerRecords.records(_topicPartition);
+        return new KafkaMessageBatch(records);
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+}
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
new file mode 100644
index 0000000..c154a38
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.utils.EqualityUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaPartitionLevelStreamConfig {
+
+    private final String _kafkaTopicName;
+    private final String _bootstrapHosts;
+    private final int _kafkaBufferSize;
+    private final int _kafkaSocketTimeout;
+    private final int _kafkaFetcherSizeBytes;
+    private final int _kafkaFetcherMinBytes;
+    private final Map<String, String> _streamConfigMap;
+
+    /**
+     * Builds a wrapper around {@link StreamConfig} to fetch kafka partition 
level consumer related configs
+     * @param streamConfig
+     */
+    public KafkaPartitionLevelStreamConfig(StreamConfig streamConfig) {
+        _streamConfigMap = streamConfig.getStreamConfigsMap();
+
+        _kafkaTopicName = streamConfig.getTopicName();
+
+        String llcBrokerListKey = KafkaStreamConfigProperties
+                
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
+        String llcBufferKey = KafkaStreamConfigProperties
+                
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
+        String llcTimeoutKey = KafkaStreamConfigProperties
+                
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
+        String fetcherSizeKey = KafkaStreamConfigProperties
+                
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
+        String fetcherMinBytesKey = KafkaStreamConfigProperties
+                
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
+        _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey);
+        _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, 
llcBufferKey,
+                
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
+        _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, 
llcTimeoutKey,
+                
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
+        _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, 
fetcherSizeKey, _kafkaBufferSize);
+        _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, 
fetcherMinBytesKey,
+                
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
+        Preconditions.checkNotNull(_bootstrapHosts,
+                "Must specify kafka brokers list " + llcBrokerListKey + " in 
case of low level kafka consumer");
+    }
+
+    public String getKafkaTopicName() {
+        return _kafkaTopicName;
+    }
+
+    public String getBootstrapHosts() {
+        return _bootstrapHosts;
+    }
+
+    public int getKafkaBufferSize() {
+        return _kafkaBufferSize;
+    }
+
+    public int getKafkaSocketTimeout() {
+        return _kafkaSocketTimeout;
+    }
+
+    public int getKafkaFetcherSizeBytes() {
+        return _kafkaFetcherSizeBytes;
+    }
+
+    public int getKafkaFetcherMinBytes() {
+        return _kafkaFetcherMinBytes;
+    }
+
+    private int getIntConfigWithDefault(Map<String, String> configMap, String 
key, int defaultValue) {
+        String stringValue = configMap.get(key);
+        try {
+            if (StringUtils.isNotEmpty(stringValue)) {
+                return Integer.parseInt(stringValue);
+            }
+            return defaultValue;
+        } catch (NumberFormatException ex) {
+            return defaultValue;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + 
_kafkaTopicName + '\'' + ", _bootstrapHosts='"
+                + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + 
_kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='"
+                + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + 
_kafkaFetcherSizeBytes + '\'' + ", _kafkaFetcherMinBytes='"
+                + _kafkaFetcherMinBytes + '\'' + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (EqualityUtils.isSameReference(this, o)) {
+            return true;
+        }
+
+        if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+            return false;
+        }
+
+        KafkaPartitionLevelStreamConfig that = 
(KafkaPartitionLevelStreamConfig) o;
+
+        return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && 
EqualityUtils
+                .isEqual(_bootstrapHosts, that._bootstrapHosts) && 
EqualityUtils
+                .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && 
EqualityUtils
+                .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && 
EqualityUtils
+                .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) 
&& EqualityUtils
+                .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
+        result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
+        result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
+        result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
+        result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
+        result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
+        return result;
+    }
+}
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
new file mode 100644
index 0000000..3c45d6e
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Joiner;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+
+
+/**
+ * Property key definitions for all kafka stream related properties
+ */
+public class KafkaStreamConfigProperties {
+  public static final String DOT_SEPARATOR = ".";
+  public static final String STREAM_TYPE = "kafka";
+
+  public static class HighLevelConsumer {
+    public static final String KAFKA_HLC_ZK_CONNECTION_STRING = 
"kafka.hlc.zk.connect.string";
+    public static final String ZK_SESSION_TIMEOUT_MS = 
"zookeeper.session.timeout.ms";
+    public static final String ZK_CONNECTION_TIMEOUT_MS = 
"zookeeper.connection.timeout.ms";
+    public static final String ZK_SYNC_TIME_MS = "zookeeper.sync.time.ms";
+    public static final String REBALANCE_MAX_RETRIES = "rebalance.max.retries";
+    public static final String REBALANCE_BACKOFF_MS = "rebalance.backoff.ms";
+    public static final String AUTO_COMMIT_ENABLE = "auto.commit.enable";
+    public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
+  }
+
+  public static class LowLevelConsumer {
+    public static final String KAFKA_BROKER_LIST = "kafka.broker.list";
+    public static final String KAFKA_BUFFER_SIZE = "kafka.buffer.size";
+    public static final int KAFKA_BUFFER_SIZE_DEFAULT = 512000;
+    public static final String KAFKA_SOCKET_TIMEOUT = "kafka.socket.timeout";
+    public static final int KAFKA_SOCKET_TIMEOUT_DEFAULT = 10000;
+    public static final String KAFKA_FETCHER_SIZE_BYTES = "kafka.fetcher.size";
+    public static final String KAFKA_FETCHER_MIN_BYTES = 
"kafka.fetcher.minBytes";
+    public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000;
+  }
+
+  public static final String KAFKA_CONSUMER_PROP_PREFIX = 
"kafka.consumer.prop";
+
+  /**
+   * Helper method to create a property string for kafka stream
+   * @param property
+   * @return
+   */
+  public static String constructStreamProperty(String property) {
+    return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, 
property);
+  }
+}
+
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
new file mode 100644
index 0000000..3871d85
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+public class KafkaStreamMetadataProvider extends KafkaConnectionHandler 
implements StreamMetadataProvider {
+
+    private AdminClient _adminClient;
+
+    public KafkaStreamMetadataProvider(StreamConfig streamConfig, int 
partition) {
+        super(streamConfig, partition);
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
_config.getBootstrapHosts());
+        _adminClient = AdminClient.create(props);
+    }
+
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+        DescribeTopicsResult result = 
_adminClient.describeTopics(Collections.singletonList(_config.getKafkaTopicName()));
+        Map<String, KafkaFuture<TopicDescription>> values = result.values();
+        KafkaFuture<TopicDescription> topicDescription = 
values.get(_config.getKafkaTopicName());
+        try {
+            return topicDescription.get().partitions().size();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("");
+        }
+    }
+
+    @Override
+    public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, 
long timeoutMillis) throws TimeoutException {
+
+        Preconditions.checkNotNull(offsetCriteria);
+        if (offsetCriteria.isLargest()) {
+            return 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis)).get(_topicPartition);
+        } else if (offsetCriteria.isSmallest()) {
+            return 
_consumer.beginningOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis)).get(_topicPartition);
+        } else {
+            throw new IllegalArgumentException("Unknown initial offset value " 
+ offsetCriteria.toString());
+        }
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+}
diff --git 
a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
new file mode 100644
index 0000000..0dea267
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.nio.ByteBuffer;
+
+public class MessageAndOffset {
+
+    private ByteBuffer _message;
+    private long _offset;
+
+    public MessageAndOffset(byte[] message, long offset) {
+        _message = ByteBuffer.wrap(message);
+        _offset = offset;
+    }
+
+    public MessageAndOffset(ByteBuffer message, long offset) {
+        _message = message;
+        _offset = offset;
+    }
+
+    public ByteBuffer getMessage() {
+        return _message;
+    }
+
+    public long getOffset() {
+        return _offset;
+    }
+
+    public long getNextOffset() {
+        return _offset + 1;
+    }
+}
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index 3695189..64d798d 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -32,12 +32,14 @@
   <artifactId>pinot-connectors</artifactId>
   <packaging>pom</packaging>
   <name>Pinot Connectors</name>
+  <url>https://pinot.apache.org/</url>
   <properties>
     <pinot.root>${basedir}/..</pinot.root>
   </properties>
 
   <modules>
     <module>pinot-connector-kafka-0.9</module>
+    <module>pinot-connector-kafka-2.0</module>
   </modules>
 
   <dependencies>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to