This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 616e2a2  Added Kafka Source and Kafka Sink to Pulsar Connect (#1557)
616e2a2 is described below

commit 616e2a278f11161aa91689cf20eecef9c572a3b9
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Apr 12 21:28:40 2018 -0700

    Added Kafka Source and Kafka Sink to Pulsar Connect (#1557)
    
    * Added Kafka Source and Kafka Sink to Pulsar Connect
    
    * Standardize on kafka versions for compat and connect
---
 pom.xml                                            |   1 +
 .../pulsar-client-kafka/pom.xml                    |   6 +-
 pulsar-connect/kafka/pom.xml                       |  67 +++++++++++
 .../org/apache/pulsar/connect/kafka/KafkaSink.java |  93 +++++++++++++++
 .../pulsar/connect/kafka/KafkaSinkConfig.java      |  59 ++++++++++
 .../apache/pulsar/connect/kafka/KafkaSource.java   | 126 +++++++++++++++++++++
 .../pulsar/connect/kafka/KafkaSourceConfig.java    |  61 ++++++++++
 pulsar-connect/pom.xml                             |   1 +
 8 files changed, 409 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index c67b742..26dced5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <hbc-core.version>2.2.0</hbc-core.version>
     <cassandra-driver-core.version>3.4.0</cassandra-driver-core.version>
     <aerospike-client.version>4.1.5</aerospike-client.version>
+    <kafka-client.version>0.10.2.1</kafka-client.version>
     <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
 
     <!-- test dependencies -->
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml 
b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index 978058d..5c3ee5d 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -37,10 +37,6 @@
   <description>Drop-in replacement for Kafka client library that publishes and 
consumes
   messages on Pulsar topics</description>
 
-  <properties>
-    <kafka.version>0.10.2.1</kafka.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>${project.groupId}</groupId>
@@ -51,7 +47,7 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
-      <version>${kafka.version}</version>
+      <version>${kafka-client.version}</version>
       <exclusions>
         <exclusion>
           <groupId>net.jpountz.lz4</groupId>
diff --git a/pulsar-connect/kafka/pom.xml b/pulsar-connect/kafka/pom.xml
new file mode 100644
index 0000000..cb08890
--- /dev/null
+++ b/pulsar-connect/kafka/pom.xml
@@ -0,0 +1,67 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-connect</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-connect-kafka</artifactId>
+  <name>Pulsar Connect :: Kafka</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-connect-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka-client.version}</version>
+    </dependency>
+
+  </dependencies>
+
+</project>
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
new file mode 100644
index 0000000..1f24309
--- /dev/null
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.connect.core.Sink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Simple Kafka Sink to publish messages to a Kafka topic
+ */
+public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+
+    private Producer<K, V> producer;
+    private Properties props = new Properties();
+    private KafkaSinkConfig kafkaSinkConfig;
+
+    @Override
+    public CompletableFuture<Void> write(KeyValue<K, V> message) {
+        ProducerRecord<K, V> record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), 
message.getValue());
+        LOG.debug("Message sending to kafka, record={}.", record);
+        Future f = producer.send(record);
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                f.get();
+                return null;
+            } catch (InterruptedException|ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+        producer.close();
+        LOG.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, String> config) throws Exception {
+        kafkaSinkConfig = KafkaSinkConfig.load(config);
+        if (kafkaSinkConfig.getTopic() == null
+                || kafkaSinkConfig.getBootstrapServers() == null
+                || kafkaSinkConfig.getAcks() == null
+                || kafkaSinkConfig.getBatchSize() == 0
+                || kafkaSinkConfig.getMaxRequestSize() == 0) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaSinkConfig.getBootstrapServers());
+        props.put(ProducerConfig.ACKS_CONFIG, kafkaSinkConfig.getAcks());
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 
kafkaSinkConfig.getBatchSize().toString());
+        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
kafkaSinkConfig.getMaxRequestSize().toString());
+
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getKeySerializerClass());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getValueSerializerClass());
+
+        producer = new KafkaProducer<>(props);
+
+        LOG.info("Kafka sink started.");
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
new file mode 100644
index 0000000..45aea78
--- /dev/null
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class KafkaSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String bootstrapServers;
+    private String acks;
+    private Long batchSize;
+    private Long maxRequestSize;
+    private String topic;
+    private String keySerializerClass = 
"org.apache.kafka.common.serialization.StringSerializer";
+    private String valueSerializerClass = 
"org.apache.kafka.common.serialization.StringSerializer";
+
+    public static KafkaSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), KafkaSinkConfig.class);
+    }
+
+    public static KafkaSinkConfig load(Map<String, String> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
KafkaSinkConfig.class);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
new file mode 100644
index 0000000..dd7066f
--- /dev/null
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.pulsar.connect.core.PushSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Simple Kafka Source to transfer messages from a Kafka topic
+ */
+public class KafkaSource<V> implements PushSource<V> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSource.class);
+
+    private Consumer<String, V> consumer;
+    private Properties props;
+    private KafkaSourceConfig kafkaSourceConfig;
+    Thread runnerThread;
+
+    private java.util.function.Function<V, CompletableFuture<Void>> 
consumeFunction;
+
+    @Override
+    public void open(Map<String, String> config) throws Exception {
+        kafkaSourceConfig = KafkaSourceConfig.load(config);
+        if (kafkaSourceConfig.getTopic() == null
+                || kafkaSourceConfig.getBootstrapServers() == null
+                || kafkaSourceConfig.getGroupId() == null
+                || kafkaSourceConfig.getFetchMinBytes() == 0
+                || kafkaSourceConfig.getAutoCommitIntervalMs() == 0
+                || kafkaSourceConfig.getSessionTimeoutMs() == 0) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+
+        props = new Properties();
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaSourceConfig.getBootstrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
kafkaSourceConfig.getGroupId());
+        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
kafkaSourceConfig.getFetchMinBytes().toString());
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
kafkaSourceConfig.getAutoCommitIntervalMs().toString());
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
kafkaSourceConfig.getSessionTimeoutMs().toString());
+
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
kafkaSourceConfig.getKeyDeserializationClass());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
kafkaSourceConfig.getValueDeserializationClass());
+
+        this.start();
+
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        LOG.info("Stopping kafka source");
+        if (runnerThread != null) {
+            runnerThread.interrupt();
+            runnerThread.join();
+            runnerThread = null;
+        }
+        if(consumer != null) {
+            consumer.close();
+            consumer = null;
+        }
+        LOG.info("Kafka source stopped.");
+    }
+
+    public void start() {
+        runnerThread = new Thread(() -> {
+            LOG.info("Starting kafka source");
+            consumer = new KafkaConsumer<>(props);
+            consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
+            LOG.info("Kafka source started.");
+            ConsumerRecords<String, V> records;
+            while(true){
+                records = consumer.poll(1000);
+                CompletableFuture<?>[] futures = new 
CompletableFuture<?>[records.count()];
+                int index = 0;
+                for (ConsumerRecord<String, V> record : records) {
+                    LOG.debug("Message received from kafka, key: {}. value: 
{}", record.key(), record.value());
+                    futures[index] = consumeFunction.apply(record.value());
+                    index++;
+                }
+                if (!kafkaSourceConfig.isAutoCommitEnabled()) {
+                    try {
+                        CompletableFuture.allOf(futures).get();
+                        consumer.commitSync();
+                    } catch (ExecutionException | InterruptedException ex) {
+                        break;
+                    }
+                }
+            }
+
+        });
+        runnerThread.setName("Kafka Source Thread");
+        runnerThread.start();
+    }
+
+    @Override
+    public void setConsumer(java.util.function.Function<V, 
CompletableFuture<Void>> consumeFunction) {
+        this.consumeFunction = consumeFunction;
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
new file mode 100644
index 0000000..77fd77b
--- /dev/null
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class KafkaSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String bootstrapServers;
+    private String groupId;
+    private Long fetchMinBytes;
+    private Long autoCommitIntervalMs;
+    private Long sessionTimeoutMs;
+    private boolean autoCommitEnabled = true;
+    private String topic;
+    private String keyDeserializationClass = 
"org.apache.kafka.common.serialization.StringDeserializer";
+    private String valueDeserializationClass = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+    public static KafkaSourceConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), KafkaSourceConfig.class);
+    }
+
+    public static KafkaSourceConfig load(Map<String, String> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
KafkaSourceConfig.class);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml
index 1b955ac..28a633f 100644
--- a/pulsar-connect/pom.xml
+++ b/pulsar-connect/pom.xml
@@ -36,6 +36,7 @@
     <module>twitter</module>
     <module>cassandra</module>
     <module>aerospike</module>
+    <module>kafka</module>
     <module>rabbitmq</module>
   </modules>
 

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to