This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 616e2a2 Added Kafka Source and Kafka Sink to Pulsar Connect (#1557) 616e2a2 is described below commit 616e2a278f11161aa91689cf20eecef9c572a3b9 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Apr 12 21:28:40 2018 -0700 Added Kafka Source and Kafka Sink to Pulsar Connect (#1557) * Added Kafka Source and Kafka Sink to Pulsar Connect * Standardize on kafka versions for compat and connect --- pom.xml | 1 + .../pulsar-client-kafka/pom.xml | 6 +- pulsar-connect/kafka/pom.xml | 67 +++++++++++ .../org/apache/pulsar/connect/kafka/KafkaSink.java | 93 +++++++++++++++ .../pulsar/connect/kafka/KafkaSinkConfig.java | 59 ++++++++++ .../apache/pulsar/connect/kafka/KafkaSource.java | 126 +++++++++++++++++++++ .../pulsar/connect/kafka/KafkaSourceConfig.java | 61 ++++++++++ pulsar-connect/pom.xml | 1 + 8 files changed, 409 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index c67b742..26dced5 100644 --- a/pom.xml +++ b/pom.xml @@ -144,6 +144,7 @@ flexible messaging model and an intuitive client API.</description> <hbc-core.version>2.2.0</hbc-core.version> <cassandra-driver-core.version>3.4.0</cassandra-driver-core.version> <aerospike-client.version>4.1.5</aerospike-client.version> + <kafka-client.version>0.10.2.1</kafka-client.version> <rabbitmq-client.version>5.1.1</rabbitmq-client.version> <!-- test dependencies --> diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml index 978058d..5c3ee5d 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml @@ -37,10 +37,6 @@ <description>Drop-in replacement for Kafka client library that publishes and consumes messages on Pulsar topics</description> - <properties> - <kafka.version>0.10.2.1</kafka.version> - </properties> - <dependencies> <dependency> <groupId>${project.groupId}</groupId> @@ -51,7 +47,7 @@ <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>${kafka.version}</version> + <version>${kafka-client.version}</version> <exclusions> <exclusion> <groupId>net.jpountz.lz4</groupId> diff --git a/pulsar-connect/kafka/pom.xml b/pulsar-connect/kafka/pom.xml new file mode 100644 index 0000000..cb08890 --- /dev/null +++ b/pulsar-connect/kafka/pom.xml @@ -0,0 +1,67 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-connect</artifactId> + <version>2.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-connect-kafka</artifactId> + <name>Pulsar Connect :: Kafka</name> + + <dependencies> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-connect-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka-client.version}</version> + </dependency> + + </dependencies> + +</project> diff --git a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java new file mode 100644 index 0000000..1f24309 --- /dev/null +++ b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.connect.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pulsar.common.util.KeyValue; +import org.apache.pulsar.connect.core.Sink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Simple Kafka Sink to publish messages to a Kafka topic + */ +public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + + private Producer<K, V> producer; + private Properties props = new Properties(); + private KafkaSinkConfig kafkaSinkConfig; + + @Override + public CompletableFuture<Void> write(KeyValue<K, V> message) { + ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), message.getValue()); + LOG.debug("Message sending to kafka, record={}.", record); + Future f = producer.send(record); + return CompletableFuture.supplyAsync(() -> { + try { + f.get(); + return null; + } catch (InterruptedException|ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void close() throws IOException { + producer.close(); + LOG.info("Kafka sink stopped."); + } + + @Override + public void open(Map<String, String> config) throws Exception { + kafkaSinkConfig = KafkaSinkConfig.load(config); + if (kafkaSinkConfig.getTopic() == null + || kafkaSinkConfig.getBootstrapServers() == null + || kafkaSinkConfig.getAcks() == null + || kafkaSinkConfig.getBatchSize() == 0 + || kafkaSinkConfig.getMaxRequestSize() == 0) { + throw new IllegalArgumentException("Required property not set."); + } + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.getBootstrapServers()); + props.put(ProducerConfig.ACKS_CONFIG, kafkaSinkConfig.getAcks()); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaSinkConfig.getBatchSize().toString()); + props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaSinkConfig.getMaxRequestSize().toString()); + + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getKeySerializerClass()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getValueSerializerClass()); + + producer = new KafkaProducer<>(props); + + LOG.info("Kafka sink started."); + } +} \ No newline at end of file diff --git a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java new file mode 100644 index 0000000..45aea78 --- /dev/null +++ b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.connect.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import lombok.*; +import lombok.experimental.Accessors; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public class KafkaSinkConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private String bootstrapServers; + private String acks; + private Long batchSize; + private Long maxRequestSize; + private String topic; + private String keySerializerClass = "org.apache.kafka.common.serialization.StringSerializer"; + private String valueSerializerClass = "org.apache.kafka.common.serialization.StringSerializer"; + + public static KafkaSinkConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), KafkaSinkConfig.class); + } + + public static KafkaSinkConfig load(Map<String, String> map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), KafkaSinkConfig.class); + } +} \ No newline at end of file diff --git a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java new file mode 100644 index 0000000..dd7066f --- /dev/null +++ b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.connect.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.pulsar.connect.core.PushSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Simple Kafka Source to transfer messages from a Kafka topic + */ +public class KafkaSource<V> implements PushSource<V> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); + + private Consumer<String, V> consumer; + private Properties props; + private KafkaSourceConfig kafkaSourceConfig; + Thread runnerThread; + + private java.util.function.Function<V, CompletableFuture<Void>> consumeFunction; + + @Override + public void open(Map<String, String> config) throws Exception { + kafkaSourceConfig = KafkaSourceConfig.load(config); + if (kafkaSourceConfig.getTopic() == null + || kafkaSourceConfig.getBootstrapServers() == null + || kafkaSourceConfig.getGroupId() == null + || kafkaSourceConfig.getFetchMinBytes() == 0 + || kafkaSourceConfig.getAutoCommitIntervalMs() == 0 + || kafkaSourceConfig.getSessionTimeoutMs() == 0) { + throw new IllegalArgumentException("Required property not set."); + } + + props = new Properties(); + + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceConfig.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceConfig.getGroupId()); + props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, kafkaSourceConfig.getFetchMinBytes().toString()); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaSourceConfig.getAutoCommitIntervalMs().toString()); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaSourceConfig.getSessionTimeoutMs().toString()); + + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass()); + + this.start(); + + } + + @Override + public void close() throws InterruptedException { + LOG.info("Stopping kafka source"); + if (runnerThread != null) { + runnerThread.interrupt(); + runnerThread.join(); + runnerThread = null; + } + if(consumer != null) { + consumer.close(); + consumer = null; + } + LOG.info("Kafka source stopped."); + } + + public void start() { + runnerThread = new Thread(() -> { + LOG.info("Starting kafka source"); + consumer = new KafkaConsumer<>(props); + consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic())); + LOG.info("Kafka source started."); + ConsumerRecords<String, V> records; + while(true){ + records = consumer.poll(1000); + CompletableFuture<?>[] futures = new CompletableFuture<?>[records.count()]; + int index = 0; + for (ConsumerRecord<String, V> record : records) { + LOG.debug("Message received from kafka, key: {}. value: {}", record.key(), record.value()); + futures[index] = consumeFunction.apply(record.value()); + index++; + } + if (!kafkaSourceConfig.isAutoCommitEnabled()) { + try { + CompletableFuture.allOf(futures).get(); + consumer.commitSync(); + } catch (ExecutionException | InterruptedException ex) { + break; + } + } + } + + }); + runnerThread.setName("Kafka Source Thread"); + runnerThread.start(); + } + + @Override + public void setConsumer(java.util.function.Function<V, CompletableFuture<Void>> consumeFunction) { + this.consumeFunction = consumeFunction; + } +} \ No newline at end of file diff --git a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java new file mode 100644 index 0000000..77fd77b --- /dev/null +++ b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.connect.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import lombok.*; +import lombok.experimental.Accessors; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public class KafkaSourceConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private String bootstrapServers; + private String groupId; + private Long fetchMinBytes; + private Long autoCommitIntervalMs; + private Long sessionTimeoutMs; + private boolean autoCommitEnabled = true; + private String topic; + private String keyDeserializationClass = "org.apache.kafka.common.serialization.StringDeserializer"; + private String valueDeserializationClass = "org.apache.kafka.common.serialization.StringDeserializer"; + + public static KafkaSourceConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), KafkaSourceConfig.class); + } + + public static KafkaSourceConfig load(Map<String, String> map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), KafkaSourceConfig.class); + } +} \ No newline at end of file diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml index 1b955ac..28a633f 100644 --- a/pulsar-connect/pom.xml +++ b/pulsar-connect/pom.xml @@ -36,6 +36,7 @@ <module>twitter</module> <module>cassandra</module> <module>aerospike</module> + <module>kafka</module> <module>rabbitmq</module> </modules> -- To stop receiving notification emails like this one, please contact mme...@apache.org.