This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
     new 0372947  [pulsar-kafka] add auto update partition support to 
producer/consumer (#13)
0372947 is described below

commit 0372947e326278a60915b0a647270ebc7ab33620
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu May 13 23:24:29 2021 -0700

    [pulsar-kafka] add auto update partition support to producer/consumer (#13)
---
 .../pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java    | 5 ++++-
 .../pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java    | 4 ++++
 .../org/apache/kafka/clients/producer/PulsarKafkaProducer.java   | 9 ++++-----
 .../pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java    | 5 ++++-
 .../pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java    | 4 ++++
 5 files changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index a527827..09a9806 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.kafka.compat;
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-
+import static 
org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -67,6 +67,9 @@ public class PulsarConsumerKafkaConfig {
             consumerBuilder.subscriptionTopicsMode(mode);
         }
 
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            
consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         return consumerBuilder;
     }
 }
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index 7554faf..3315cd2 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -34,6 +34,7 @@ public class PulsarProducerKafkaConfig {
     public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 
"pulsar.producer.max.pending.messages.across.partitions";
     public static final String BATCHING_ENABLED = 
"pulsar.producer.batching.enabled";
     public static final String BATCHING_MAX_MESSAGES = 
"pulsar.producer.batching.max.messages";
+    public static final String AUTO_UPDATE_PARTITIONS = 
"pulsar.auto.update.partitions";
     /**
      * send operations will immediately fail with {@link 
ProducerQueueIsFullError} when there is no space left in
      * pending queue.
@@ -66,6 +67,9 @@ public class PulsarProducerKafkaConfig {
             
producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
         }
 
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            
producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         return producerBuilder;
     }
 }
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 7d6e146..48481a9 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -19,18 +19,14 @@
 package org.apache.kafka.clients.producer;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.util.Base64;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRouter;
@@ -73,6 +69,7 @@ public class PulsarKafkaProducer<K, V> extends Producer<K, V> 
{
     public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES = 
"queue.buffering.max.messages";
     public static String KAFKA_KEY_MAX_BATCH_MESSAGES = "batch.num.messages";
     public static String KAFKA_KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms";
+    public static String AUTO_UPDATE_PARTITIONS = 
"pulsar.auto.update.partitions";
 
     private final ConcurrentMap<String, 
org.apache.pulsar.client.api.Producer<byte[]>> producers = new 
ConcurrentHashMap<>();
 
@@ -134,7 +131,9 @@ public class PulsarKafkaProducer<K, V> extends Producer<K, 
V> {
         if (properties.containsKey(KAFKA_KEY_REQUEST_TIMEOUT_MS)) {
             pulsarProducerBuilder.sendTimeout(config.requestTimeoutMs(), 
TimeUnit.MILLISECONDS);
         }
-
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            
pulsarProducerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         
pulsarProducerBuilder.blockIfQueueFull(blockIfQueueFull).compressionType(compressionType);
 
     }
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index a527827..09a9806 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.kafka.compat;
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-
+import static 
org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -67,6 +67,9 @@ public class PulsarConsumerKafkaConfig {
             consumerBuilder.subscriptionTopicsMode(mode);
         }
 
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            
consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         return consumerBuilder;
     }
 }
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index 5a9a651..509df03 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -33,6 +33,7 @@ public class PulsarProducerKafkaConfig {
     public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 
"pulsar.producer.max.pending.messages.across.partitions";
     public static final String BATCHING_ENABLED = 
"pulsar.producer.batching.enabled";
     public static final String BATCHING_MAX_MESSAGES = 
"pulsar.producer.batching.max.messages";
+    public static final String AUTO_UPDATE_PARTITIONS = 
"pulsar.auto.update.partitions";
 
     public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient 
client, Properties properties) {
         ProducerBuilder<byte[]> producerBuilder = client.newProducer();
@@ -60,6 +61,9 @@ public class PulsarProducerKafkaConfig {
             
producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
         }
 
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            
producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         return producerBuilder;
     }
 }

Reply via email to