merlimat closed pull request #1311: Converted main part of code to use builder 
APIs with typed interface
URL: https://github.com/apache/incubator-pulsar/pull/1311
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 19106802c..bbdc69695 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -78,11 +78,12 @@
 import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
 import 
org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
 import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -499,27 +500,29 @@ public PulsarClient getReplicationClient(String cluster) {
                 String path = PulsarWebResource.path("clusters", cluster);
                 ClusterData data = 
this.pulsar.getConfigurationCache().clustersCache().get(path)
                         .orElseThrow(() -> new 
KeeperException.NoNodeException(path));
-                ClientConfiguration configuration = new ClientConfiguration();
-                configuration.setUseTcpNoDelay(false);
-                
configuration.setConnectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker());
-                configuration.setStatsInterval(0, TimeUnit.SECONDS);
+                ClientBuilder clientBuilder = PulsarClient.builder()
+                        .enableTcpNoDelay(false)
+                        
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
+                        .statsInterval(0, TimeUnit.SECONDS);
                 if (pulsar.getConfiguration().isAuthenticationEnabled()) {
-                    
configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
+                    
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                 }
-                String clusterUrl = null;
                 if (pulsar.getConfiguration().isReplicationTlsEnabled()) {
-                    clusterUrl = isNotBlank(data.getBrokerServiceUrlTls()) ? 
data.getBrokerServiceUrlTls()
-                            : data.getServiceUrlTls();
-                    configuration.setUseTls(true);
-                    
configuration.setTlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath());
-                    configuration
-                            
.setTlsAllowInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
+                    clientBuilder
+                            
.serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? 
data.getBrokerServiceUrlTls()
+                                    : data.getServiceUrlTls())
+                            .enableTls(true)
+                            
.tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
+                            
.allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
                 } else {
-                    clusterUrl = isNotBlank(data.getBrokerServiceUrl()) ? 
data.getBrokerServiceUrl()
-                            : data.getServiceUrl();
+                    clientBuilder.serviceUrl(
+                            isNotBlank(data.getBrokerServiceUrl()) ? 
data.getBrokerServiceUrl() : data.getServiceUrl());
                 }
-                return new PulsarClientImpl(clusterUrl, configuration, 
this.workerGroup);
+
+                // Share all the IO threads across broker and client 
connections
+                ClientConfigurationData conf = ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData();
+                return new PulsarClientImpl(conf, workerGroup);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c0e38399a..02055ac5b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -104,24 +104,20 @@ protected void resetConfig() {
 
     protected final void internalSetup() throws Exception {
         init();
-        org.apache.pulsar.client.api.ClientConfiguration clientConf = new 
org.apache.pulsar.client.api.ClientConfiguration();
-        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
         lookupUrl = new URI(brokerUrl.toString());
         if (isTcpLookup) {
             lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT);
         }
-        pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf);
+        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();
     }
 
     protected final void internalSetupForStatsTest() throws Exception {
         init();
-        org.apache.pulsar.client.api.ClientConfiguration clientConf = new 
org.apache.pulsar.client.api.ClientConfiguration();
-        clientConf.setStatsInterval(1, TimeUnit.SECONDS);
         String lookupUrl = brokerUrl.toString();
         if (isTcpLookup) {
             lookupUrl = new URI("pulsar://localhost:" + 
BROKER_PORT).toString();
         }
-        pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(1, 
TimeUnit.SECONDS).build();
     }
 
     protected final void init() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
index fafc541c1..c3096ece1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
@@ -64,7 +64,7 @@ public void testClosedConsumer() throws PulsarClientException 
{
     public void testListener() throws PulsarClientException {
         Consumer consumer = null;
         ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setMessageListener((Consumer c, Message msg) -> {
+        conf.setMessageListener((Consumer<byte[]> c, Message<byte[]> msg) -> {
         });
         consumer = 
pulsarClient.subscribe("persistent://prop/cluster/ns/topicName", 
"my-subscription", conf);
         Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally());
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 31c9921a1..886da5f5d 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -45,8 +45,8 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
@@ -62,7 +62,7 @@
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, 
MessageListener {
+public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, 
MessageListener<byte[]> {
 
     private static final long serialVersionUID = 1L;
 
@@ -74,7 +74,7 @@
     private final String groupId;
     private final boolean isAutoCommit;
 
-    private final ConcurrentMap<TopicPartition, 
org.apache.pulsar.client.api.Consumer> consumers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<TopicPartition, 
org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new 
ConcurrentHashMap<>();
 
     private final Map<TopicPartition, Long> lastReceivedOffset = new 
ConcurrentHashMap<>();
     private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = 
new ConcurrentHashMap<>();
@@ -84,10 +84,10 @@
     private final Properties properties;
 
     private static class QueueItem {
-        final org.apache.pulsar.client.api.Consumer consumer;
-        final Message message;
+        final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
+        final Message<byte[]> message;
 
-        QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message 
message) {
+        QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, 
Message<byte[]> message) {
             this.consumer = consumer;
             this.message = message;
         }
@@ -146,19 +146,19 @@ private PulsarKafkaConsumer(ConsumerConfig config, 
Deserializer<K> keyDeserializ
 
         this.properties = new Properties();
         config.originals().forEach((k, v) -> properties.put(k, v));
-        ClientConfiguration clientConf = 
PulsarClientKafkaConfig.getClientConfiguration(properties);
+        ClientBuilder clientBuilder = 
PulsarClientKafkaConfig.getClientBuilder(properties);
         // Since this client instance is going to be used just for the 
consumers, we can enable Nagle to group
         // all the acknowledgments sent to broker within a short time frame
-        clientConf.setUseTcpNoDelay(false);
+        clientBuilder.enableTcpNoDelay(false);
         try {
-            client = PulsarClient.create(serviceUrl, clientConf);
+            client = clientBuilder.serviceUrl(serviceUrl).build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
     }
 
     @Override
-    public void received(org.apache.pulsar.client.api.Consumer consumer, 
Message msg) {
+    public void received(org.apache.pulsar.client.api.Consumer<byte[]> 
consumer, Message<byte[]> msg) {
         // Block listener thread if the application is slowing down
         try {
             receivedMessages.put(new QueueItem(consumer, msg));
@@ -204,16 +204,17 @@ public void subscribe(Collection<String> topics, 
ConsumerRebalanceListener callb
                 // acknowledgeCumulative()
                 int numberOfPartitions = ((PulsarClientImpl) 
client).getNumberOfPartitions(topic).get();
 
-                ConsumerConfiguration conf = 
PulsarConsumerKafkaConfig.getConsumerConfiguration(properties);
-                conf.setSubscriptionType(SubscriptionType.Failover);
-                conf.setMessageListener(this);
+                ConsumerBuilder<byte[]> consumerBuilder = 
PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
+                consumerBuilder.subscriptionType(SubscriptionType.Failover);
+                consumerBuilder.messageListener(this);
+                consumerBuilder.subscriptionName(groupId);
                 if (numberOfPartitions > 1) {
                     // Subscribe to each partition
-                    conf.setConsumerName(ConsumerName.generateRandomName());
+                    
consumerBuilder.consumerName(ConsumerName.generateRandomName());
                     for (int i = 0; i < numberOfPartitions; i++) {
                         String partitionName = 
TopicName.get(topic).getPartition(i).toString();
-                        
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client
-                                .subscribeAsync(partitionName, groupId, conf);
+                        
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = 
consumerBuilder.clone()
+                                .topic(partitionName).subscribeAsync();
                         int partitionIndex = i;
                         TopicPartition tp = new TopicPartition(topic, 
partitionIndex);
                         future.thenAccept(consumer -> 
consumers.putIfAbsent(tp, consumer));
@@ -222,8 +223,8 @@ public void subscribe(Collection<String> topics, 
ConsumerRebalanceListener callb
                     }
                 } else {
                     // Topic has a single partition
-                    
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = 
client.subscribeAsync(topic,
-                            groupId, conf);
+                    
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = 
consumerBuilder.topic(topic)
+                            .subscribeAsync();
                     TopicPartition tp = new TopicPartition(topic, 0);
                     future.thenAccept(consumer -> consumers.putIfAbsent(tp, 
consumer));
                     futures.add(future);
@@ -293,7 +294,7 @@ public void unsubscribe() {
                 TopicName topicName = TopicName.get(item.consumer.getTopic());
                 String topic = topicName.getPartitionedTopicName();
                 int partition = topicName.isPartitioned() ? 
topicName.getPartitionIndex() : 0;
-                Message msg = item.message;
+                Message<byte[]> msg = item.message;
                 MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
                 long offset = MessageIdUtils.getOffset(msgId);
 
@@ -335,7 +336,7 @@ public void unsubscribe() {
     }
 
     @SuppressWarnings("unchecked")
-    private K getKey(String topic, Message msg) {
+    private K getKey(String topic, Message<byte[]> msg) {
         if (!msg.hasKey()) {
             return null;
         }
@@ -393,7 +394,7 @@ public void commitAsync(Map<TopicPartition, 
OffsetAndMetadata> offsets, OffsetCo
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
         offsets.forEach((topicPartition, offsetAndMetadata) -> {
-            org.apache.pulsar.client.api.Consumer consumer = 
consumers.get(topicPartition);
+            org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
consumers.get(topicPartition);
 
             lastCommittedOffset.put(topicPartition, offsetAndMetadata);
             
futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
@@ -415,7 +416,7 @@ public void commitAsync(Map<TopicPartition, 
OffsetAndMetadata> offsets, OffsetCo
     @Override
     public void seek(TopicPartition partition, long offset) {
         MessageId msgId = MessageIdUtils.getMessageId(offset);
-        org.apache.pulsar.client.api.Consumer c = consumers.get(partition);
+        org.apache.pulsar.client.api.Consumer<byte[]> c = 
consumers.get(partition);
         if (c == null) {
             throw new IllegalArgumentException("Cannot seek on a partition 
where we are not subscribed");
         }
@@ -436,7 +437,7 @@ public void seekToBeginning(Collection<TopicPartition> 
partitions) {
         }
 
         for (TopicPartition tp : partitions) {
-            org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+            org.apache.pulsar.client.api.Consumer<byte[]> c = 
consumers.get(tp);
             if (c == null) {
                 futures.add(FutureUtil.failedFuture(
                         new IllegalArgumentException("Cannot seek on a 
partition where we are not subscribed")));
@@ -457,7 +458,7 @@ public void seekToEnd(Collection<TopicPartition> 
partitions) {
         }
 
         for (TopicPartition tp : partitions) {
-            org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
+            org.apache.pulsar.client.api.Consumer<byte[]> c = 
consumers.get(tp);
             if (c == null) {
                 futures.add(FutureUtil.failedFuture(
                         new IllegalArgumentException("Cannot seek on a 
partition where we are not subscribed")));
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 7b8bf9ab3..ae69c8575 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -38,12 +38,11 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -54,9 +53,9 @@
 public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
 
     private final PulsarClient client;
-    private final ProducerConfiguration pulsarProducerConf;
+    private final ProducerBuilder<byte[]> pulsarProducerBuilder;
 
-    private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer> 
producers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, 
org.apache.pulsar.client.api.Producer<byte[]>> producers = new 
ConcurrentHashMap<>();
 
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
@@ -107,30 +106,29 @@ private PulsarKafkaProducer(Map<String, Object> conf, 
Properties properties, Ser
         }
 
         String serviceUrl = 
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
-        ClientConfiguration clientConf = 
PulsarClientKafkaConfig.getClientConfiguration(properties);
         try {
-            client = PulsarClient.create(serviceUrl, clientConf);
+            client = 
PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
 
-        pulsarProducerConf = 
PulsarProducerKafkaConfig.getProducerConfiguration(properties);
+        pulsarProducerBuilder = 
PulsarProducerKafkaConfig.getProducerBuilder(client, properties);
 
         // To mimic the same batching mode as Kafka, we need to wait a very 
little amount of
         // time to batch if the client is trying to send messages fast enough
         long lingerMs = 
Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1"));
-        pulsarProducerConf.setBatchingMaxPublishDelay(lingerMs, 
TimeUnit.MILLISECONDS);
+        pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, 
TimeUnit.MILLISECONDS);
 
         String compressionType = 
properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG);
         if ("gzip".equals(compressionType)) {
-            pulsarProducerConf.setCompressionType(CompressionType.ZLIB);
+            pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
         } else if ("lz4".equals(compressionType)) {
-            pulsarProducerConf.setCompressionType(CompressionType.LZ4);
+            pulsarProducerBuilder.compressionType(CompressionType.LZ4);
         }
 
-        pulsarProducerConf.setSendTimeout(
-                
Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
"60000")),
-                TimeUnit.MILLISECONDS);
+
+        int sendTimeoutMillis = 
Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
"60000"));
+        pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, 
TimeUnit.MILLISECONDS);
 
         boolean blockOnBufferFull = Boolean
                 
.parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG,
 "false"));
@@ -138,8 +136,8 @@ private PulsarKafkaProducer(Map<String, Object> conf, 
Properties properties, Ser
         // Kafka blocking semantic when blockOnBufferFull=false is different 
from Pulsar client
         // Pulsar throws error immediately when the queue is full and 
blockIfQueueFull=false
         // Kafka, on the other hand, still blocks for "max.block.ms" time and 
then gives error.
-        boolean shouldBlockPulsarProducer = 
pulsarProducerConf.getSendTimeoutMs() > 0 || blockOnBufferFull;
-        pulsarProducerConf.setBlockIfQueueFull(shouldBlockPulsarProducer);
+        boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || 
blockOnBufferFull;
+        pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
     }
 
     @Override
@@ -149,7 +147,7 @@ private PulsarKafkaProducer(Map<String, Object> conf, 
Properties properties, Ser
 
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
-        org.apache.pulsar.client.api.Producer producer;
+        org.apache.pulsar.client.api.Producer<byte[]> producer;
 
         try {
             producer = producers.computeIfAbsent(record.topic(), topic -> 
createNewProducer(topic));
@@ -162,7 +160,7 @@ private PulsarKafkaProducer(Map<String, Object> conf, 
Properties properties, Ser
             return future;
         }
 
-        Message msg = getMessage(record);
+        Message<byte[]> msg = getMessage(record);
         int messageSize = msg.getData().length;
 
         CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
@@ -225,20 +223,20 @@ public void close(long timeout, TimeUnit unit) {
         }
     }
 
-    private org.apache.pulsar.client.api.Producer createNewProducer(String 
topic) {
+    private org.apache.pulsar.client.api.Producer<byte[]> 
createNewProducer(String topic) {
         try {
-            return client.createProducer(topic, pulsarProducerConf);
+            return pulsarProducerBuilder.clone().topic(topic).create();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
     }
 
-    private Message getMessage(ProducerRecord<K, V> record) {
+    private Message<byte[]> getMessage(ProducerRecord<K, V> record) {
         if (record.partition() != null) {
             throw new UnsupportedOperationException("");
         }
 
-        MessageBuilder builder = MessageBuilder.create();
+        MessageBuilder<byte[]> builder = MessageBuilder.create();
         if (record.key() != null) {
             builder.setKey(getKey(record.topic(), record.key()));
         }
@@ -259,7 +257,7 @@ private String getKey(String topic, K key) {
         }
     }
 
-    private RecordMetadata getRecordMetadata(String topic, Message msg, 
MessageId messageId, int size) {
+    private RecordMetadata getRecordMetadata(String topic, Message<byte[]> 
msg, MessageId messageId, int size) {
         MessageIdImpl msgId = (MessageIdImpl) messageId;
 
         // Combine ledger id and entry id to form offset
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
index d9ce75e6f..ca57f5b12 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -22,7 +22,8 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 
 public class PulsarClientKafkaConfig {
 
@@ -31,6 +32,7 @@
     public static final String USE_TLS = "pulsar.use.tls";
     public static final String TLS_TRUST_CERTS_FILE_PATH = 
"pulsar.tls.trust.certs.file.path";
     public static final String TLS_ALLOW_INSECURE_CONNECTION = 
"pulsar.tls.allow.insecure.connection";
+    public static final String TLS_HOSTNAME_VERIFICATION = 
"pulsar.tls.hostname.verification";
 
     public static final String OPERATION_TIMEOUT_MS = 
"pulsar.operation.timeout.ms";
     public static final String STATS_INTERVAL_SECONDS = 
"pulsar.stats.interval.seconds";
@@ -43,8 +45,8 @@
     public static final String CONCURRENT_LOOKUP_REQUESTS = 
"pulsar.concurrent.lookup.requests";
     public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION 
= "pulsar.max.number.rejected.request.per.connection";
 
-    public static ClientConfiguration getClientConfiguration(Properties 
properties) {
-        ClientConfiguration conf = new ClientConfiguration();
+    public static ClientBuilder getClientBuilder(Properties properties) {
+        ClientBuilder clientBuilder = PulsarClient.builder();
 
         if (properties.containsKey(AUTHENTICATION_CLASS)) {
             String className = properties.getProperty(AUTHENTICATION_CLASS);
@@ -52,48 +54,53 @@ public static ClientConfiguration 
getClientConfiguration(Properties properties)
                 @SuppressWarnings("unchecked")
                 Class<Authentication> clazz = (Class<Authentication>) 
Class.forName(className);
                 Authentication auth = clazz.newInstance();
-                conf.setAuthentication(auth);
+                clientBuilder.authentication(auth);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
         }
 
-        conf.setUseTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, 
"false")));
-        
conf.setUseTls(Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION,
 "false")));
+        
clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, 
"false")));
+        clientBuilder.allowTlsInsecureConnection(
+                
Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, 
"false")));
+        clientBuilder.enableTlsHostnameVerification(
+                
Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, 
"false")));
+
         if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
-            
conf.setTlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+            
clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
         }
 
         if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
-            
conf.setOperationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+            
clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
                     TimeUnit.MILLISECONDS);
         }
 
         if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
-            
conf.setStatsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
 TimeUnit.SECONDS);
+            
clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
+                    TimeUnit.SECONDS);
         }
 
         if (properties.containsKey(NUM_IO_THREADS)) {
-            
conf.setIoThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+            
clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
         }
 
         if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
-            
conf.setConnectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+            
clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
         }
 
         if (properties.containsKey(USE_TCP_NODELAY)) {
-            
conf.setUseTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+            
clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
         }
 
         if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
-            
conf.setConcurrentLookupRequest(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+            
clientBuilder.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
         }
 
         if 
(properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
-            conf.setMaxNumberOfRejectedRequestPerConnection(
+            clientBuilder.maxNumberOfRejectedRequestPerConnection(
                     
Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
         }
 
-        return conf;
+        return clientBuilder;
     }
 }
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index f91c48485..4addfb709 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -20,7 +20,8 @@
 
 import java.util.Properties;
 
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 
 public class PulsarConsumerKafkaConfig {
 
@@ -29,22 +30,22 @@
     public static final String RECEIVER_QUEUE_SIZE = 
"pulsar.consumer.receiver.queue.size";
     public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = 
"pulsar.consumer.total.receiver.queue.size.across.partitions";
 
-    public static ConsumerConfiguration getConsumerConfiguration(Properties 
properties) {
-        ConsumerConfiguration conf = new ConsumerConfiguration();
+    public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient 
client, Properties properties) {
+        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
 
         if (properties.containsKey(CONSUMER_NAME)) {
-            conf.setConsumerName(properties.getProperty(CONSUMER_NAME));
+            
consumerBuilder.consumerName(properties.getProperty(CONSUMER_NAME));
         }
 
         if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
-            
conf.setReceiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
+            
consumerBuilder.receiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
         }
 
         if 
(properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
-            conf.setMaxTotalReceiverQueueSizeAcrossPartitions(
+            consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(
                     
Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
         }
 
-        return conf;
+        return consumerBuilder;
     }
 }
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index c2e4886bc..5a9a65152 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -20,7 +20,8 @@
 
 import java.util.Properties;
 
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 
 public class PulsarProducerKafkaConfig {
 
@@ -33,32 +34,32 @@
     public static final String BATCHING_ENABLED = 
"pulsar.producer.batching.enabled";
     public static final String BATCHING_MAX_MESSAGES = 
"pulsar.producer.batching.max.messages";
 
-    public static ProducerConfiguration getProducerConfiguration(Properties 
properties) {
-        ProducerConfiguration conf = new ProducerConfiguration();
+    public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient 
client, Properties properties) {
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer();
 
         if (properties.containsKey(PRODUCER_NAME)) {
-            conf.setProducerName(properties.getProperty(PRODUCER_NAME));
+            
producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
         }
 
         if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
-            
conf.setInitialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
+            
producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
         }
 
         if (properties.containsKey(MAX_PENDING_MESSAGES)) {
-            
conf.setMaxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
+            
producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
         }
 
         if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
-            conf.setMaxPendingMessagesAcrossPartitions(
+            producerBuilder.maxPendingMessagesAcrossPartitions(
                     
Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
         }
 
-        
conf.setBatchingEnabled(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED,
 "true")));
+        
producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED,
 "true")));
 
         if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
-            
conf.setBatchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
+            
producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
         }
 
-        return conf;
+        return producerBuilder;
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index d9dcb1c9c..e4457dfcc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -100,7 +100,7 @@
      *
      * @param topicsPattern
      */
-    ConsumerBuilder topicsPattern(Pattern topicsPattern);
+    ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);
 
     /**
      * Specify a pattern for topics that this consumer will subscribe on.
@@ -111,7 +111,7 @@
      * @param topicsPattern
      *            given regular expression for topics pattern
      */
-    ConsumerBuilder topicsPattern(String topicsPattern);
+    ConsumerBuilder<T> topicsPattern(String topicsPattern);
 
     /**
      * Specify the subscription name for this consumer.
@@ -238,7 +238,7 @@
      * @param periodInMinutes
      *            whether to read from the compacted topic
      */
-    ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes);
+    ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes);
 
     /**
      * Sets priority level for the shared subscription consumers to which 
broker gives more priority while dispatching
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 7761b9db9..c526af5b9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -95,7 +95,7 @@ public ConsumerConfiguration 
setSubscriptionType(SubscriptionType subscriptionTy
     /**
      * @return the configured {@link MessageListener} for the consumer
      */
-    public MessageListener getMessageListener() {
+    public MessageListener<byte[]> getMessageListener() {
         return conf.getMessageListener();
     }
 
@@ -108,7 +108,7 @@ public MessageListener getMessageListener() {
      * @param messageListener
      *            the listener object
      */
-    public ConsumerConfiguration setMessageListener(MessageListener 
messageListener) {
+    public ConsumerConfiguration setMessageListener(MessageListener<byte[]> 
messageListener) {
         checkNotNull(messageListener);
         conf.setMessageListener(messageListener);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
index aeb8bbbb5..e2e627414 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
@@ -26,11 +26,11 @@
     /**
      * Notified when the consumer group is changed, and the consumer becomes 
the active consumer.
      */
-    void becameActive(Consumer consumer, int partitionId);
+    void becameActive(Consumer<?> consumer, int partitionId);
 
     /**
      * Notified when the consumer group is changed, and the consumer is still 
inactive or becomes inactive.
      */
-    void becameInactive(Consumer consumer, int partitionId);
+    void becameInactive(Consumer<?> consumer, int partitionId);
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
index a9cef6f73..bc2b91536 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
@@ -30,7 +30,7 @@
      * @deprecated since 1.22.0. Please use {@link #choosePartition(Message, 
TopicMetadata)} instead.
      */
     @Deprecated
-    default int choosePartition(Message msg) {
+    default int choosePartition(Message<?> msg) {
         throw new UnsupportedOperationException("Use #choosePartition(Message, 
TopicMetadata) instead");
     }
 
@@ -42,7 +42,7 @@ default int choosePartition(Message msg) {
      * @return the partition to route the message.
      * @since 1.22.0
      */
-    default int choosePartition(Message msg, TopicMetadata metadata) {
+    default int choosePartition(Message<?> msg, TopicMetadata metadata) {
         return choosePartition(msg);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index a037cfe45..16dcd009c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -262,7 +262,7 @@
      *            maximum number of messages in a batch
      * @return
      */
-    ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);
+    ProducerBuilder<T> batchingMaxMessages(int 
batchMessagesMaxMessagesPerBatch);
 
     /**
      * Set the baseline for the sequence ids for messages published by the 
producer.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index c81b2eb35..f40a8a455 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -99,7 +99,7 @@
      * @param readerListener
      *            the listener object
      */
-    ReaderBuilder<T> readerListener(ReaderListener readerListener);
+    ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);
 
     /**
      * Sets a {@link CryptoKeyReader}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
index d329d09d4..a97e5249c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
@@ -53,7 +53,7 @@
     // sequence id for this batch which will be persisted as a single entry by 
broker
     long sequenceId = -1;
     ByteBuf batchedMessageMetadataAndPayload;
-    List<MessageImpl> messages = Lists.newArrayList();
+    List<MessageImpl<?>> messages = Lists.newArrayList();
     // keep track of callbacks for individual messages being published in a 
batch
     SendCallback firstCallback;
 
@@ -73,13 +73,13 @@
         this.producerName = producerName;
     }
 
-    boolean hasSpaceInBatch(MessageImpl msg) {
+    boolean hasSpaceInBatch(MessageImpl<?> msg) {
         int messageSize = msg.getDataBuffer().readableBytes();
         return ((messageSize + currentBatchSizeBytes) <= 
MAX_MESSAGE_BATCH_SIZE_BYTES
                 && numMessagesInBatch < maxNumMessagesInBatch);
     }
 
-    void add(MessageImpl msg, SendCallback callback) {
+    void add(MessageImpl<?> msg, SendCallback callback) {
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] add message to batch, num messages in batch 
so far {}", topicName, producerName,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 4cffb7fa8..3effc7f3b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -154,4 +154,8 @@ public ClientBuilder 
maxNumberOfRejectedRequestPerConnection(int maxNumberOfReje
         
conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
         return this;
     }
+
+    public ClientConfigurationData getClientConfigurationData() {
+        return conf;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6c2d758c7..37904fdfc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -86,8 +86,8 @@
     private final ConcurrentLongHashMap<CompletableFuture<List<String>>> 
pendingGetTopicsRequests = new ConcurrentLongHashMap<>(
         16, 1);
 
-    private final ConcurrentLongHashMap<ProducerImpl> producers = new 
ConcurrentLongHashMap<>(16, 1);
-    private final ConcurrentLongHashMap<ConsumerImpl> consumers = new 
ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new 
ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new 
ConcurrentLongHashMap<>(16, 1);
 
     private final CompletableFuture<Void> connectionFuture = new 
CompletableFuture<Void>();
     private final Semaphore pendingLookupRequestSemaphore;
@@ -253,7 +253,7 @@ protected void handleMessage(CommandMessage cmdMessage, 
ByteBuf headersAndPayloa
         if (log.isDebugEnabled()) {
             log.debug("{} Received a message from the server: {}", 
ctx.channel(), cmdMessage);
         }
-        ConsumerImpl consumer = consumers.get(cmdMessage.getConsumerId());
+        ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
         if (consumer != null) {
             consumer.messageReceived(cmdMessage.getMessageId(), 
headersAndPayload, this);
         }
@@ -266,7 +266,7 @@ protected void 
handleActiveConsumerChange(CommandActiveConsumerChange change) {
         if (log.isDebugEnabled()) {
             log.debug("{} Received a consumer group change message from the 
server : {}", ctx.channel(), change);
         }
-        ConsumerImpl consumer = consumers.get(change.getConsumerId());
+        ConsumerImpl<?> consumer = consumers.get(change.getConsumerId());
         if (consumer != null) {
             consumer.activeConsumerChanged(change.getIsActive());
         }
@@ -398,7 +398,7 @@ protected void 
handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn
 
         log.info("[{}] Broker notification reached the end of topic: {}", 
remoteAddress, consumerId);
 
-        ConsumerImpl consumer = consumers.get(consumerId);
+        ConsumerImpl<?> consumer = consumers.get(consumerId);
         if (consumer != null) {
             consumer.setTerminated();
         }
@@ -472,7 +472,7 @@ protected void handleError(CommandError error) {
     protected void handleCloseProducer(CommandCloseProducer closeProducer) {
         log.info("[{}] Broker notification of Closed producer: {}", 
remoteAddress, closeProducer.getProducerId());
         final long producerId = closeProducer.getProducerId();
-        ProducerImpl producer = producers.get(producerId);
+        ProducerImpl<?> producer = producers.get(producerId);
         if (producer != null) {
             producer.connectionClosed(this);
         } else {
@@ -484,7 +484,7 @@ protected void handleCloseProducer(CommandCloseProducer 
closeProducer) {
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         log.info("[{}] Broker notification of Closed consumer: {}", 
remoteAddress, closeConsumer.getConsumerId());
         final long consumerId = closeConsumer.getConsumerId();
-        ConsumerImpl consumer = consumers.get(consumerId);
+        ConsumerImpl<?> consumer = consumers.get(consumerId);
         if (consumer != null) {
             consumer.connectionClosed(this);
         } else {
@@ -666,11 +666,11 @@ private boolean verifyTlsHostName(String hostname, 
ChannelHandlerContext ctx) {
         return false;
     }
 
-    void registerConsumer(final long consumerId, final ConsumerImpl consumer) {
+    void registerConsumer(final long consumerId, final ConsumerImpl<?> 
consumer) {
         consumers.put(consumerId, consumer);
     }
 
-    void registerProducer(final long producerId, final ProducerImpl producer) {
+    void registerProducer(final long producerId, final ProducerImpl<?> 
producer) {
         producers.put(producerId, producer);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index af6b732b7..f51b1b3db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -50,7 +50,7 @@
     }
 
     protected final String subscription;
-    protected final ConsumerConfigurationData conf;
+    protected final ConsumerConfigurationData<T> conf;
     protected final String consumerName;
     protected final CompletableFuture<Consumer<T>> subscribeFuture;
     protected final MessageListener<T> listener;
@@ -168,7 +168,7 @@ protected ConsumerBase(PulsarClientImpl client, String 
topic, ConsumerConfigurat
     abstract protected Message<T> internalReceive(int timeout, TimeUnit unit) 
throws PulsarClientException;
 
     @Override
-    public void acknowledge(Message message) throws PulsarClientException {
+    public void acknowledge(Message<?> message) throws PulsarClientException {
         try {
             acknowledge(message.getMessageId());
         } catch (NullPointerException npe) {
@@ -194,7 +194,7 @@ public void acknowledge(MessageId messageId) throws 
PulsarClientException {
     }
 
     @Override
-    public void acknowledgeCumulative(Message message) throws 
PulsarClientException {
+    public void acknowledgeCumulative(Message<?> message) throws 
PulsarClientException {
         try {
             acknowledgeCumulative(message.getMessageId());
         } catch (NullPointerException npe) {
@@ -220,7 +220,7 @@ public void acknowledgeCumulative(MessageId messageId) 
throws PulsarClientExcept
     }
 
     @Override
-    public CompletableFuture<Void> acknowledgeAsync(Message message) {
+    public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
         try {
             return acknowledgeAsync(message.getMessageId());
         } catch (NullPointerException npe) {
@@ -229,7 +229,7 @@ public void acknowledgeCumulative(MessageId messageId) 
throws PulsarClientExcept
     }
 
     @Override
-    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message message) 
{
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> 
message) {
         try {
             return acknowledgeCumulativeAsync(message.getMessageId());
         } catch (NullPointerException npe) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index e7163c9fb..d8507bbe0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -206,9 +206,8 @@ private ConsumerBuilderImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T
     }
 
     @Override
-    public ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes) {
+    public ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes) {
         conf.setPatternAutoDiscoveryPeriod(periodInMinutes);
         return this;
     }
-
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8499565ae..183651a1e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -82,6 +82,7 @@
 
     // Number of messages that have delivered to the application. Every once 
in a while, this number will be sent to the
     // broker to notify that we are ready to get (and store in the incoming 
messages queue) more messages
+    @SuppressWarnings("rawtypes")
     private static final AtomicIntegerFieldUpdater<ConsumerImpl> 
AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
             .newUpdater(ConsumerImpl.class, "availablePermits");
     @SuppressWarnings("unused")
@@ -288,7 +289,7 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
             do {
                 message = incomingMessages.take();
                 lastDequeuedMessage = message.getMessageId();
-                ClientCnx msgCnx = ((MessageImpl) message).getCnx();
+                ClientCnx msgCnx = ((MessageImpl<?>) message).getCnx();
                 // synchronized need to prevent race between connectionOpened 
and the check "msgCnx == cnx()"
                 synchronized (ConsumerImpl.this) {
                     // if message received due to an old flow - discard it and 
wait for the message from the
@@ -631,7 +632,7 @@ void connectionOpened(final ClientCnx cnx) {
      * not seen by the application
      */
     private BatchMessageIdImpl clearReceiverQueue() {
-        List<Message> currentMessageQueue = new 
ArrayList<>(incomingMessages.size());
+        List<Message<?>> currentMessageQueue = new 
ArrayList<>(incomingMessages.size());
         incomingMessages.drainTo(currentMessageQueue);
         if (!currentMessageQueue.isEmpty()) {
             MessageIdImpl nextMessageInQueue = (MessageIdImpl) 
currentMessageQueue.get(0).getMessageId();
@@ -984,9 +985,9 @@ void receiveIndividualMessagesFromBatch(MessageMetadata 
msgMetadata, ByteBuf unc
      *
      * Periodically, it sends a Flow command to notify the broker that it can 
push more messages
      */
-    protected synchronized void messageProcessed(Message msg) {
+    protected synchronized void messageProcessed(Message<?> msg) {
         ClientCnx currentCnx = cnx();
-        ClientCnx msgCnx = ((MessageImpl) msg).getCnx();
+        ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
         lastDequeuedMessage = msg.getMessageId();
 
         if (msgCnx != currentCnx) {
@@ -1371,7 +1372,7 @@ private void internalGetLastMessageIdAsync(final Backoff 
backoff,
         }
     }
 
-    private MessageIdImpl getMessageIdImpl(Message msg) {
+    private MessageIdImpl getMessageIdImpl(Message<?> msg) {
         MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId();
         if (messageId instanceof BatchMessageIdImpl) {
             // messageIds contain MessageIdImpl, not BatchMessageIdImpl
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
index 3ef26c831..21ff3c436 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
@@ -41,7 +41,7 @@
     private static final long serialVersionUID = 1L;
     private TimerTask stat;
     private Timeout statTimeout;
-    private ConsumerImpl consumer;
+    private ConsumerImpl<?> consumer;
     private PulsarClientImpl pulsarClient;
     private long oldTime;
     private long statsIntervalSeconds;
@@ -74,7 +74,7 @@ public ConsumerStats() {
         throughputFormat = null;
     }
 
-    public ConsumerStats(PulsarClientImpl pulsarClient, 
ConsumerConfigurationData conf, ConsumerImpl consumer) {
+    public ConsumerStats(PulsarClientImpl pulsarClient, 
ConsumerConfigurationData<?> conf, ConsumerImpl<?> consumer) {
         this.pulsarClient = pulsarClient;
         this.consumer = consumer;
         this.statsIntervalSeconds = 
pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -92,7 +92,7 @@ public ConsumerStats(PulsarClientImpl pulsarClient, 
ConsumerConfigurationData co
         init(conf);
     }
 
-    private void init(ConsumerConfigurationData conf) {
+    private void init(ConsumerConfigurationData<?> conf) {
         ObjectMapper m = new ObjectMapper();
         m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
         ObjectWriter w = m.writerWithDefaultPrettyPrinter();
@@ -149,7 +149,7 @@ private void init(ConsumerConfigurationData conf) {
         statTimeout = pulsarClient.timer().newTimeout(stat, 
statsIntervalSeconds, TimeUnit.SECONDS);
     }
 
-    void updateNumMsgsReceived(Message message) {
+    void updateNumMsgsReceived(Message<?> message) {
         if (message != null) {
             numMsgsReceived.increment();
             numBytesReceived.add(message.getData().length);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index fdcaf6378..08b189cc1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -24,7 +24,7 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    void updateNumMsgsReceived(Message message) {
+    void updateNumMsgsReceived(Message<?> message) {
         // Do nothing
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index f4585ce23..8fcc40ad2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -56,7 +56,8 @@
 
     // Constructor for out-going message
     static <T> MessageImpl<T> create(MessageMetadata.Builder 
msgMetadataBuilder, ByteBuffer payload, Schema<T> schema) {
-        MessageImpl<T> msg = RECYCLER.get();
+        @SuppressWarnings("unchecked")
+        MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
         msg.msgMetadataBuilder = msgMetadataBuilder;
         msg.messageId = null;
         msg.cnx = null;
@@ -67,7 +68,8 @@
     }
 
     static MessageImpl<byte[]> create(MessageMetadata.Builder 
msgMetadataBuilder, ByteBuffer payload) {
-        MessageImpl<byte[]> msg = RECYCLER.get();
+        @SuppressWarnings("unchecked")
+        MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
         msg.msgMetadataBuilder = msgMetadataBuilder;
         msg.messageId = null;
         msg.cnx = null;
@@ -141,8 +143,9 @@ public MessageImpl(String msgId, Map<String, String> 
properties, ByteBuf payload
         this.properties = Collections.unmodifiableMap(properties);
     }
 
-    public static MessageImpl deserialize(ByteBuf headersAndPayload) throws 
IOException {
-        MessageImpl msg = RECYCLER.get();
+    public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) 
throws IOException {
+        @SuppressWarnings("unchecked")
+        MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
         MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
 
         msg.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
@@ -287,16 +290,16 @@ public void recycle() {
         }
     }
 
-    private MessageImpl(Handle<MessageImpl> recyclerHandle) {
+    private MessageImpl(Handle<MessageImpl<?>> recyclerHandle) {
         this.recyclerHandle = recyclerHandle;
     }
 
-    private Handle<MessageImpl> recyclerHandle;
+    private Handle<MessageImpl<?>> recyclerHandle;
 
-    private final static Recycler<MessageImpl> RECYCLER = new 
Recycler<MessageImpl>() {
+    private final static Recycler<MessageImpl<?>> RECYCLER = new 
Recycler<MessageImpl<?>>() {
         @Override
-        protected MessageImpl newObject(Handle<MessageImpl> handle) {
-            return new MessageImpl(handle);
+        protected MessageImpl<?> newObject(Handle<MessageImpl<?>> handle) {
+            return new MessageImpl<>(handle);
         }
     };
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index f662ccf1d..df6894c47 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -20,7 +20,6 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,6 +33,7 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -47,13 +47,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
 
     private final List<ConsumerImpl<T>> consumers;
 
     // Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
     // shared incoming queue was full
-    private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
+    private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
 
     // Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
     // resume receiving from the paused consumer partitions
@@ -165,7 +167,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() {
         try {
             if (incomingMessages.size() <= sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty()) {
                 while (true) {
-                    ConsumerImpl consumer = pausedConsumers.poll();
+                    ConsumerImpl<T> consumer = pausedConsumers.poll();
                     if (consumer == null) {
                         break;
                     }
@@ -360,12 +362,7 @@ private void failPendingReceive() {
 
     @Override
     public boolean isConnected() {
-        for (ConsumerImpl consumer : consumers) {
-            if (!consumer.isConnected()) {
-                return false;
-            }
-        }
-        return true;
+        return consumers.stream().allMatch(ConsumerImpl::isConnected);
     }
 
     @Override
@@ -442,6 +439,7 @@ String getHandlerName() {
         if (null != conf.getConsumerEventListener()) {
             
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
         }
+
         int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
                 conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions);
         internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
@@ -460,7 +458,7 @@ String getHandlerName() {
     @Override
     public void redeliverUnacknowledgedMessages() {
         synchronized (this) {
-            for (ConsumerImpl c : consumers) {
+            for (ConsumerImpl<T> c : consumers) {
                 c.redeliverUnacknowledgedMessages();
             }
             incomingMessages.clear();
@@ -509,11 +507,7 @@ public void seek(MessageId messageId) throws 
PulsarClientException {
      * @return true if all batch messages have been acknowledged
      */
     public boolean isBatchingAckTrackerEmpty() {
-        boolean state = true;
-        for (Consumer consumer : consumers) {
-            state &= ((ConsumerImpl) consumer).isBatchingAckTrackerEmpty();
-        }
-        return state;
+        return 
consumers.stream().allMatch(ConsumerImpl::isBatchingAckTrackerEmpty);
     }
 
     List<ConsumerImpl<T>> getConsumers() {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index e70ce4b2b..5281d5104 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -161,14 +161,8 @@ private void start() {
 
     @Override
     public boolean isConnected() {
-        for (ProducerImpl producer : producers) {
-            // returns false if any of the partition is not connected
-            if (!producer.isConnected()) {
-                return false;
-            }
-        }
-
-        return true;
+        // returns false if any of the partition is not connected
+        return producers.stream().allMatch(ProducerImpl::isConnected);
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b55f27914..15779cd92 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -104,6 +104,7 @@
 
     private final Map<String, String> metadata;
 
+    @SuppressWarnings("rawtypes")
     private static final AtomicLongFieldUpdater<ProducerImpl> 
msgIdGeneratorUpdater = AtomicLongFieldUpdater
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
 
@@ -184,7 +185,7 @@ public long getLastSequenceId() {
     }
 
     @Override
-    public CompletableFuture<MessageId> sendAsync(Message message) {
+    public CompletableFuture<MessageId> sendAsync(Message<T> message) {
         CompletableFuture<MessageId> future = new CompletableFuture<>();
 
         sendAsync(message, new SendCallback() {
@@ -232,7 +233,7 @@ public void addCallback(SendCallback scb) {
         return future;
     }
 
-    public void sendAsync(Message message, SendCallback callback) {
+    public void sendAsync(Message<T> message, SendCallback callback) {
         checkArgument(message instanceof MessageImpl);
 
         if (!isValidProducerState(callback)) {
@@ -243,7 +244,7 @@ public void sendAsync(Message message, SendCallback 
callback) {
             return;
         }
 
-        MessageImpl msg = (MessageImpl) message;
+        MessageImpl<T> msg = (MessageImpl<T>) message;
         MessageMetadata.Builder msgMetadata = msg.getMessageBuilder();
         ByteBuf payload = msg.getDataBuffer();
 
@@ -387,7 +388,7 @@ private ByteBufPair sendMessage(long producerId, long 
sequenceId, int numMessage
         return Commands.newSend(producerId, sequenceId, numMessages, 
checksumType, msgMetadata, compressedPayload);
     }
 
-    private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, 
ByteBuf payload) {
+    private void doBatchSendAndAdd(MessageImpl<T> msg, SendCallback callback, 
ByteBuf payload) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Closing out batch to accomodate large message 
with size {}", topic, producerName,
                     msg.getDataBuffer().readableBytes());
@@ -440,12 +441,12 @@ private boolean canEnqueueRequest(SendCallback callback) {
     }
 
     private static final class WriteInEventLoopCallback implements Runnable {
-        private ProducerImpl producer;
+        private ProducerImpl<?> producer;
         private ByteBufPair cmd;
         private long sequenceId;
         private ClientCnx cnx;
 
-        static WriteInEventLoopCallback create(ProducerImpl producer, 
ClientCnx cnx, OpSendMsg op) {
+        static WriteInEventLoopCallback create(ProducerImpl<?> producer, 
ClientCnx cnx, OpSendMsg op) {
             WriteInEventLoopCallback c = RECYCLER.get();
             c.producer = producer;
             c.cnx = cnx;
@@ -735,8 +736,8 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg 
op) {
     }
 
     protected static final class OpSendMsg {
-        MessageImpl msg;
-        List<MessageImpl> msgs;
+        MessageImpl<?> msg;
+        List<MessageImpl<?>> msgs;
         ByteBufPair cmd;
         SendCallback callback;
         long sequenceId;
@@ -744,7 +745,7 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg 
op) {
         long batchSizeByte = 0;
         int numMessagesInBatch = 1;
 
-        static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long 
sequenceId, SendCallback callback) {
+        static OpSendMsg create(MessageImpl<?> msg, ByteBufPair cmd, long 
sequenceId, SendCallback callback) {
             OpSendMsg op = RECYCLER.get();
             op.msg = msg;
             op.cmd = cmd;
@@ -754,7 +755,7 @@ static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, 
long sequenceId, SendC
             return op;
         }
 
-        static OpSendMsg create(List<MessageImpl> msgs, ByteBufPair cmd, long 
sequenceId, SendCallback callback) {
+        static OpSendMsg create(List<MessageImpl<?>> msgs, ByteBufPair cmd, 
long sequenceId, SendCallback callback) {
             OpSendMsg op = RECYCLER.get();
             op.msgs = msgs;
             op.cmd = cmd;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
index 6a5e321ff..262800336 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
@@ -41,7 +41,7 @@
     private static final long serialVersionUID = 1L;
     private TimerTask stat;
     private Timeout statTimeout;
-    private ProducerImpl producer;
+    private ProducerImpl<?> producer;
     private PulsarClientImpl pulsarClient;
     private long oldTime;
     private long statsIntervalSeconds;
@@ -74,7 +74,7 @@ public ProducerStats() {
         ds = null;
     }
 
-    public ProducerStats(PulsarClientImpl pulsarClient, 
ProducerConfigurationData conf, ProducerImpl producer) {
+    public ProducerStats(PulsarClientImpl pulsarClient, 
ProducerConfigurationData conf, ProducerImpl<?> producer) {
         this.pulsarClient = pulsarClient;
         this.statsIntervalSeconds = 
pulsarClient.getConfiguration().getStatsIntervalSeconds();
         this.producer = producer;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1741eeba9..47e953b5f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import com.google.common.collect.Lists;
@@ -256,7 +255,7 @@ public ClientConfigurationData getConfiguration() {
                 log.debug("[{}] Received topic metadata. partitions: {}", 
topic, metadata.partitions);
             }
 
-            ProducerBase producer;
+            ProducerBase<T> producer;
             if (metadata.partitions > 1) {
                 producer = new 
PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, 
metadata.partitions,
                         producerCreatedFuture, schema);
@@ -437,7 +436,7 @@ public ClientConfigurationData getConfiguration() {
 
                 List<String> topicsList = topicsPatternFilter(topics, 
conf.getTopicsPattern());
                 conf.getTopicNames().addAll(topicsList);
-                ConsumerBase consumer = new 
PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
+                ConsumerBase<T> consumer = new 
PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
                     PulsarClientImpl.this,
                     conf,
                     externalExecutorProvider.getExecutor(),
@@ -587,12 +586,12 @@ public void close() throws PulsarClientException {
         synchronized (producers) {
             // Copy to a new list, because the closing will trigger a removal 
from the map
             // and invalidate the iterator
-            List<ProducerBase> producersToClose = 
Lists.newArrayList(producers.keySet());
+            List<ProducerBase<?>> producersToClose = 
Lists.newArrayList(producers.keySet());
             producersToClose.forEach(p -> futures.add(p.closeAsync()));
         }
 
         synchronized (consumers) {
-            List<ConsumerBase> consumersToClose = 
Lists.newArrayList(consumers.keySet());
+            List<ConsumerBase<?>> consumersToClose = 
Lists.newArrayList(consumers.keySet());
             consumersToClose.forEach(c -> futures.add(c.closeAsync()));
         }
 
@@ -688,13 +687,13 @@ private static EventLoopGroup 
getEventLoopGroup(ClientConfigurationData conf) {
         return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), 
threadFactory);
     }
 
-    void cleanupProducer(ProducerBase producer) {
+    void cleanupProducer(ProducerBase<?> producer) {
         synchronized (producers) {
             producers.remove(producer);
         }
     }
 
-    void cleanupConsumer(ConsumerBase consumer) {
+    void cleanupConsumer(ConsumerBase<?> consumer) {
         synchronized (consumers) {
             consumers.remove(consumer);
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index bef909d98..4ab5f423b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -43,16 +43,17 @@
     private final Schema<T> schema;
 
     ReaderBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
-        this(client, new ReaderConfigurationData(), schema);
+        this(client, new ReaderConfigurationData<T>(), schema);
     }
 
-    private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData 
conf, Schema<T> schema) {
+    private ReaderBuilderImpl(PulsarClientImpl client, 
ReaderConfigurationData<T> conf, Schema<T> schema) {
         this.client = client;
         this.conf = conf;
         this.schema = schema;
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public ReaderBuilder<T> clone() {
         try {
             return (ReaderBuilder<T>) super.clone();
@@ -106,7 +107,7 @@ private ReaderBuilderImpl(PulsarClientImpl client, 
ReaderConfigurationData conf,
     }
 
     @Override
-    public ReaderBuilder<T> readerListener(ReaderListener readerListener) {
+    public ReaderBuilder<T> readerListener(ReaderListener<T> readerListener) {
         conf.setReaderListener(readerListener);
         return this;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index daf4799fe..ed374f663 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -90,7 +90,7 @@ public String getTopic() {
         return consumer.getTopic();
     }
 
-    public ConsumerImpl getConsumer() {
+    public ConsumerImpl<T> getConsumer() {
         return consumer;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index 6b3f93794..ce67f5b46 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -39,7 +39,7 @@ public RoundRobinPartitionMessageRouterImpl(HashingScheme 
hashingScheme) {
     }
 
     @Override
-    public int choosePartition(Message msg, TopicMetadata topicMetadata) {
+    public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
         // If the message has a key, it supersedes the round robin routing 
policy
         if (msg.hasKey()) {
             return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index d95d83cb7..433744273 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -31,9 +31,10 @@
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
@@ -95,19 +96,20 @@ public ProxyService(ProxyConfiguration proxyConfig) throws 
IOException {
         this.acceptorGroup  = EventLoopUtil.newEventLoopGroup(1, 
acceptorThreadFactory);
         this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, 
workersThreadFactory);
 
-        ClientConfiguration clientConfiguration = new ClientConfiguration();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(serviceUrl);
         if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
-            
clientConfiguration.setAuthentication(proxyConfig.getBrokerClientAuthenticationPlugin(),
-                    proxyConfig.getBrokerClientAuthenticationParameters());
+            
clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                    proxyConfig.getBrokerClientAuthenticationParameters()));
         }
         if (proxyConfig.isTlsEnabledWithBroker()) {
-            clientConfiguration.setUseTls(true);
-            
clientConfiguration.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
-            
clientConfiguration.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
+            clientConf.setUseTls(true);
+            
clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
+            
clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
         }
 
-        this.client = new PulsarClientImpl(serviceUrl, clientConfiguration, 
workerGroup);
-        this.clientAuthentication = clientConfiguration.getAuthentication();
+        this.client = new PulsarClientImpl(clientConf, workerGroup);
+        this.clientAuthentication = clientConf.getAuthentication();
     }
 
     public void start() throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index d4b8e4d86..f98c48d82 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -25,11 +25,9 @@
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import 
org.apache.pulsar.proxy.server.ProxyRolesEnforcementTest.BasicAuthentication;
@@ -142,8 +140,7 @@ private void createAdminClient() throws 
PulsarClientException {
     }
 
     private PulsarClient createPulsarClient(String proxyServiceUrl, String 
authParams) throws PulsarClientException {
-        org.apache.pulsar.client.api.ClientConfiguration clientConf = new 
org.apache.pulsar.client.api.ClientConfiguration();
-        clientConf.setAuthentication(BasicAuthentication.class.getName(), 
authParams);
-        return PulsarClient.create(proxyServiceUrl, clientConf);
+        return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+                .authentication(BasicAuthentication.class.getName(), 
authParams).build();
     }
 }
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
index 3c43611c6..4291fa8fd 100644
--- 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
+++ 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
@@ -33,7 +33,7 @@
      * @param msg
      * @return
      */
-    public Values toValues(Message msg);
+    public Values toValues(Message<byte[]> msg);
 
     /**
      * Declare the output schema for the spout.
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index a293fd694..0aa1ee35f 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -18,29 +18,34 @@
  */
 package org.apache.pulsar.storm;
 
+import static java.lang.String.format;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
-
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Tuple;
-import static java.lang.String.format;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("deprecation")
 public class PulsarBolt extends BaseRichBolt implements IMetric {
     /**
      *
@@ -52,8 +57,8 @@
     public static final String PRODUCER_RATE = "producerRate";
     public static final String PRODUCER_THROUGHPUT_BYTES = 
"producerThroughput";
 
-    private final ClientConfiguration clientConf;
-    private final ProducerConfiguration producerConf;
+    private final ClientConfigurationData clientConf;
+    private final ProducerConfigurationData producerConf;
     private final PulsarBoltConfiguration pulsarBoltConf;
     private final ConcurrentMap<String, Object> metricsMap = 
Maps.newConcurrentMap();
 
@@ -65,17 +70,39 @@
     private volatile long messagesSent = 0;
     private volatile long messageSizeSent = 0;
 
+    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder 
clientBuilder) {
+        this.clientConf = ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData().clone();
+        this.producerConf = new ProducerConfigurationData();
+        Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
+        Preconditions.checkNotNull(pulsarBoltConf.getTopic());
+        Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+
+        this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
+        this.producerConf.setTopicName(pulsarBoltConf.getTopic());
+        this.pulsarBoltConf = pulsarBoltConf;
+    }
+
+    /**
+     * @deprecated Use {@link #PulsarBolt(PulsarBoltConfiguration, 
ClientBuilder)}
+     */
+    @Deprecated
     public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, 
ClientConfiguration clientConf) {
         this(pulsarBoltConf, clientConf, new ProducerConfiguration());
     }
 
+    /**
+     * @deprecated Use {@link #PulsarBolt(PulsarBoltConfiguration, 
ClientBuilder)}
+     */
+    @Deprecated
     public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, 
ClientConfiguration clientConf,
             ProducerConfiguration producerConf) {
-        this.clientConf = clientConf;
-        this.producerConf = producerConf;
+        this.clientConf = clientConf.getConfigurationData().clone();
+        this.producerConf = 
producerConf.getProducerConfigurationData().clone();
         Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
         Preconditions.checkNotNull(pulsarBoltConf.getTopic());
         Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+        this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
+        this.producerConf.setTopicName(pulsarBoltConf.getTopic());
         this.pulsarBoltConf = pulsarBoltConf;
     }
 
@@ -86,8 +113,8 @@ public void prepare(Map conf, TopologyContext context, 
OutputCollector collector
         this.boltId = String.format("%s-%s", componentId, 
context.getThisTaskId());
         this.collector = collector;
         try {
-            sharedPulsarClient = SharedPulsarClient.get(componentId, 
pulsarBoltConf.getServiceUrl(), clientConf);
-            producer = 
sharedPulsarClient.getSharedProducer(pulsarBoltConf.getTopic(), producerConf);
+            sharedPulsarClient = SharedPulsarClient.get(componentId, 
clientConf);
+            producer = sharedPulsarClient.getSharedProducer(producerConf);
             LOG.info("[{}] Created a pulsar producer on topic {} to send 
messages", boltId, pulsarBoltConf.getTopic());
         } catch (PulsarClientException e) {
             LOG.error("[{}] Error initializing pulsar producer on topic {}", 
boltId, pulsarBoltConf.getTopic(), e);
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 639adb915..af26035ed 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -22,15 +22,11 @@
 
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
@@ -38,15 +34,25 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.Backoff;
-
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.shade.com.google.common.collect.Sets;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+@SuppressWarnings("deprecation")
 public class PulsarSpout extends BaseRichSpout implements IMetric {
 
     private static final long serialVersionUID = 1L;
@@ -59,37 +65,61 @@
     public static final String CONSUMER_RATE = "consumerRate";
     public static final String CONSUMER_THROUGHPUT_BYTES = 
"consumerThroughput";
 
-    private final ClientConfiguration clientConf;
-    private final ConsumerConfiguration consumerConf;
+    private final ClientConfigurationData clientConf;
+    private final ConsumerConfigurationData<byte[]> consumerConf;
     private final PulsarSpoutConfiguration pulsarSpoutConf;
     private final long failedRetriesTimeoutNano;
     private final int maxFailedRetries;
     private final ConcurrentMap<MessageId, MessageRetries> 
pendingMessageRetries = Maps.newConcurrentMap();
-    private final Queue<Message> failedMessages = 
Queues.newConcurrentLinkedQueue();
+    private final Queue<Message<byte[]>> failedMessages = 
Queues.newConcurrentLinkedQueue();
     private final ConcurrentMap<String, Object> metricsMap = 
Maps.newConcurrentMap();
 
     private SharedPulsarClient sharedPulsarClient;
     private String componentId;
     private String spoutId;
     private SpoutOutputCollector collector;
-    private Consumer consumer;
+    private Consumer<byte[]> consumer;
     private volatile long messagesReceived = 0;
     private volatile long messagesEmitted = 0;
     private volatile long pendingAcks = 0;
     private volatile long messageSizeReceived = 0;
 
+    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder 
clientBuilder) {
+        Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
+        Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
+        Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
+        Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+
+        this.clientConf = ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData().clone();
+        this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+        this.consumerConf = new ConsumerConfigurationData<>();
+        
this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+        
this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+
+        this.pulsarSpoutConf = pulsarSpoutConf;
+        this.failedRetriesTimeoutNano = 
pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
+        this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
+    }
+
+    @Deprecated
     public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, 
ClientConfiguration clientConf) {
         this(pulsarSpoutConf, clientConf, new ConsumerConfiguration());
     }
 
+    @Deprecated
     public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, 
ClientConfiguration clientConf,
             ConsumerConfiguration consumerConf) {
-        this.clientConf = clientConf;
-        this.consumerConf = consumerConf;
+        this.clientConf = clientConf.getConfigurationData().clone();
+        this.consumerConf = consumerConf.getConfigurationData().clone();
         Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
         Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
         Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
         Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+
+        this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+        
this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+        
this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+
         this.pulsarSpoutConf = pulsarSpoutConf;
         this.failedRetriesTimeoutNano = 
pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
         this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
@@ -115,7 +145,7 @@ public void close() {
     @Override
     public void ack(Object msgId) {
         if (msgId instanceof Message) {
-            Message msg = (Message) msgId;
+            Message<?> msg = (Message<?>) msgId;
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Received ack for message {}", spoutId, 
msg.getMessageId());
             }
@@ -128,7 +158,8 @@ public void ack(Object msgId) {
     @Override
     public void fail(Object msgId) {
         if (msgId instanceof Message) {
-            Message msg = (Message) msgId;
+            @SuppressWarnings("unchecked")
+            Message<byte[]> msg = (Message<byte[]>) msgId;
             MessageId id = msg.getMessageId();
             LOG.warn("[{}] Error processing message {}", spoutId, id);
 
@@ -160,7 +191,7 @@ public void fail(Object msgId) {
     public void nextTuple() {
         emitNextAvailableTuple();
     }
-    
+
     /**
      * It makes sure that it emits next available non-tuple to topology unless 
consumer queue doesn't have any message
      * available. It receives message from consumer queue and converts it to 
tuple and emits to topology. if the
@@ -168,7 +199,7 @@ public void nextTuple() {
      * emit.
      */
     public void emitNextAvailableTuple() {
-        Message msg;
+        Message<byte[]> msg;
 
         // check if there are any failed messages to re-emit in the topology
         msg = failedMessages.peek();
@@ -219,13 +250,15 @@ public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collect
         pendingMessageRetries.clear();
         failedMessages.clear();
         try {
-            sharedPulsarClient = SharedPulsarClient.get(componentId, 
pulsarSpoutConf.getServiceUrl(), clientConf);
+            sharedPulsarClient = SharedPulsarClient.get(componentId, 
clientConf);
             if (pulsarSpoutConf.isSharedConsumerEnabled()) {
-                consumer = 
sharedPulsarClient.getSharedConsumer(pulsarSpoutConf.getTopic(),
-                        pulsarSpoutConf.getSubscriptionName(), consumerConf);
+                consumer = sharedPulsarClient.getSharedConsumer(consumerConf);
             } else {
-                consumer = 
sharedPulsarClient.getClient().subscribe(pulsarSpoutConf.getTopic(),
-                        pulsarSpoutConf.getSubscriptionName(), consumerConf);
+                try {
+                    consumer = 
sharedPulsarClient.getClient().subscribeAsync(consumerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
             }
             LOG.info("[{}] Created a pulsar consumer on topic {} to receive 
messages with subscription {}", spoutId,
                     pulsarSpoutConf.getTopic(), 
pulsarSpoutConf.getSubscriptionName());
@@ -244,7 +277,7 @@ public void declareOutputFields(OutputFieldsDeclarer 
declarer) {
 
     }
 
-    private boolean mapToValueAndEmit(Message msg) {
+    private boolean mapToValueAndEmit(Message<byte[]> msg) {
         if (msg != null) {
             Values values = 
pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
             ++pendingAcks;
diff --git 
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java 
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
index 86740774d..4506e11a7 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -18,36 +18,37 @@
  */
 package org.apache.pulsar.storm;
 
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 
 public class SharedPulsarClient {
     private static final Logger LOG = 
LoggerFactory.getLogger(SharedPulsarClient.class);
     private static final ConcurrentMap<String, SharedPulsarClient> instances = 
Maps.newConcurrentMap();
 
     private final String componentId;
-    private final PulsarClient client;
+    private final PulsarClientImpl client;
     private final AtomicInteger counter = new AtomicInteger();
 
-    private Consumer consumer;
-    private Producer producer;
+    private Consumer<byte[]> consumer;
+    private Producer<byte[]> producer;
 
-    private SharedPulsarClient(String componentId, String serviceUrl, 
ClientConfiguration clientConf)
+    private SharedPulsarClient(String componentId, ClientConfigurationData 
clientConf)
             throws PulsarClientException {
-        this.client = PulsarClient.create(serviceUrl, clientConf);
+        this.client = new PulsarClientImpl(clientConf);
         this.componentId = componentId;
     }
 
@@ -62,13 +63,13 @@ private SharedPulsarClient(String componentId, String 
serviceUrl, ClientConfigur
      * @return
      * @throws PulsarClientException
      */
-    public static SharedPulsarClient get(String componentId, String 
serviceUrl, ClientConfiguration clientConf)
+    public static SharedPulsarClient get(String componentId, 
ClientConfigurationData clientConf)
             throws PulsarClientException {
         AtomicReference<PulsarClientException> exception = new 
AtomicReference<PulsarClientException>();
         instances.computeIfAbsent(componentId, pulsarClient -> {
             SharedPulsarClient sharedPulsarClient = null;
             try {
-                sharedPulsarClient = new SharedPulsarClient(componentId, 
serviceUrl, clientConf);
+                sharedPulsarClient = new SharedPulsarClient(componentId, 
clientConf);
                 LOG.info("[{}] Created a new Pulsar Client.", componentId);
             } catch (PulsarClientException e) {
                 exception.set(e);
@@ -81,33 +82,41 @@ public static SharedPulsarClient get(String componentId, 
String serviceUrl, Clie
         return instances.get(componentId);
     }
 
-    public PulsarClient getClient() {
+    public PulsarClientImpl getClient() {
         counter.incrementAndGet();
         return client;
     }
 
-    public Consumer getSharedConsumer(String topic, String subscription, 
ConsumerConfiguration consumerConf)
+    public Consumer<byte[]> 
getSharedConsumer(ConsumerConfigurationData<byte[]> consumerConf)
             throws PulsarClientException {
         counter.incrementAndGet();
         synchronized (this) {
             if (consumer == null) {
-                consumer = client.subscribe(topic, subscription, consumerConf);
-                LOG.info("[{}] Created a new Pulsar Consumer on {}", 
componentId, topic);
+                try {
+                    consumer = client.subscribeAsync(consumerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
+                LOG.info("[{}] Created a new Pulsar Consumer on {}", 
componentId, consumerConf.getSingleTopic());
             } else {
-                LOG.info("[{}] Using a shared consumer on {}", componentId, 
topic);
+                LOG.info("[{}] Using a shared consumer on {}", componentId, 
consumerConf.getSingleTopic());
             }
         }
         return consumer;
     }
 
-    public Producer getSharedProducer(String topic, ProducerConfiguration 
producerConf) throws PulsarClientException {
+    public Producer<byte[]> getSharedProducer(ProducerConfigurationData 
producerConf) throws PulsarClientException {
         counter.incrementAndGet();
         synchronized (this) {
             if (producer == null) {
-                producer = client.createProducer(topic, producerConf);
-                LOG.info("[{}] Created a new Pulsar Producer on {}", 
componentId, topic);
+                try {
+                    producer = client.createProducerAsync(producerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
+                LOG.info("[{}] Created a new Pulsar Producer on {}", 
componentId, producerConf.getTopicName());
             } else {
-                LOG.info("[{}] Using a shared producer on {}", componentId, 
topic);
+                LOG.info("[{}] Using a shared producer on {}", componentId, 
producerConf.getTopicName());
             }
         }
         return producer;
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index f07eeba7f..e508412e6 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.testclient;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
@@ -36,16 +34,14 @@
 
 import org.HdrHistogram.Histogram;
 import org.HdrHistogram.Recorder;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -197,33 +193,33 @@ public static void main(String[] args) throws Exception {
 
         final RateLimiter limiter = arguments.rate > 0 ? 
RateLimiter.create(arguments.rate) : null;
 
-        MessageListener listener = new MessageListener() {
-            public void received(Consumer consumer, Message msg) {
-                messagesReceived.increment();
-                bytesReceived.add(msg.getData().length);
+        MessageListener<byte[]> listener = (consumer, msg) -> {
+            messagesReceived.increment();
+            bytesReceived.add(msg.getData().length);
 
-                if (limiter != null) {
-                    limiter.acquire();
-                }
+            if (limiter != null) {
+                limiter.acquire();
+            }
 
-                long latencyMillis = System.currentTimeMillis() - 
msg.getPublishTime();
-                recorder.recordValue(latencyMillis);
-                cumulativeRecorder.recordValue(latencyMillis);
+            long latencyMillis = System.currentTimeMillis() - 
msg.getPublishTime();
+            recorder.recordValue(latencyMillis);
+            cumulativeRecorder.recordValue(latencyMillis);
 
-                consumer.acknowledgeAsync(msg);
-            }
+            consumer.acknowledgeAsync(msg);
         };
 
-        ClientConfiguration clientConf = new ClientConfiguration();
-        clientConf.setConnectionsPerBroker(arguments.maxConnections);
-        clientConf.setStatsInterval(arguments.statsIntervalSeconds, 
TimeUnit.SECONDS);
-        clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
+        ClientBuilder clientBuilder = PulsarClient.builder() //
+                .serviceUrl(arguments.serviceURL) //
+                .connectionsPerBroker(arguments.maxConnections) //
+                .statsInterval(arguments.statsIntervalSeconds, 
TimeUnit.SECONDS) //
+                .ioThreads(Runtime.getRuntime().availableProcessors()) //
+                .enableTls(arguments.useTls) //
+                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
         if (isNotBlank(arguments.authPluginClassName)) {
-            clientConf.setAuthentication(arguments.authPluginClassName, 
arguments.authParams);
+            clientBuilder.authentication(arguments.authPluginClassName, 
arguments.authParams);
         }
-        clientConf.setUseTls(arguments.useTls);
-        clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
-        PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, 
clientConf);
+
+        PulsarClient pulsarClient = clientBuilder.build();
 
         class EncKeyReader implements CryptoKeyReader {
 
@@ -246,16 +242,17 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
                 return null;
             }
         }
+
         List<Future<Consumer<byte[]>>> futures = Lists.newArrayList();
-        ConsumerConfiguration consumerConfig = new ConsumerConfiguration();
-        consumerConfig.setMessageListener(listener);
-        consumerConfig.setReceiverQueueSize(arguments.receiverQueueSize);
-        consumerConfig.setSubscriptionType(arguments.subscriptionType);
+        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() //
+                .messageListener(listener) //
+                .receiverQueueSize(arguments.receiverQueueSize) //
+                .subscriptionType(arguments.subscriptionType);
 
         if (arguments.encKeyName != null) {
             byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));
             EncKeyReader keyReader = new EncKeyReader(pKey);
-            consumerConfig.setCryptoKeyReader(keyReader);
+            consumerBuilder.cryptoKeyReader(keyReader);
         }
 
         for (int i = 0; i < arguments.numTopics; i++) {
@@ -271,7 +268,8 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
                     subscriberName = arguments.subscriberName;
                 }
 
-                futures.add(pulsarClient.subscribeAsync(topicName.toString(), 
subscriberName, consumerConfig));
+                
futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
+                        .subscribeAsync());
             }
         }
 
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 681eb3d27..41dfa9814 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -42,16 +42,14 @@
 import org.HdrHistogram.Histogram;
 import org.HdrHistogram.HistogramLogWriter;
 import org.HdrHistogram.Recorder;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -233,15 +231,17 @@ public static void main(String[] args) throws Exception {
         String prefixTopicName = arguments.topics.get(0);
         List<Future<Producer<byte[]>>> futures = Lists.newArrayList();
 
-        ClientConfiguration clientConf = new ClientConfiguration();
-        clientConf.setConnectionsPerBroker(arguments.maxConnections);
-        clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
-        clientConf.setStatsInterval(arguments.statsIntervalSeconds, 
TimeUnit.SECONDS);
+        ClientBuilder clientBuilder = PulsarClient.builder() //
+                .serviceUrl(arguments.serviceURL) //
+                .connectionsPerBroker(arguments.maxConnections) //
+                .ioThreads(Runtime.getRuntime().availableProcessors()) //
+                .statsInterval(arguments.statsIntervalSeconds, 
TimeUnit.SECONDS) //
+                .enableTls(arguments.useTls) //
+                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
         if (isNotBlank(arguments.authPluginClassName)) {
-            clientConf.setAuthentication(arguments.authPluginClassName, 
arguments.authParams);
+            clientBuilder.authentication(arguments.authPluginClassName, 
arguments.authParams);
         }
-        clientConf.setUseTls(arguments.useTls);
-        clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
 
         class EncKeyReader implements CryptoKeyReader {
 
@@ -264,27 +264,26 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
                 return null;
             }
         }
-        PulsarClient client = new PulsarClientImpl(arguments.serviceURL, 
clientConf);
-
-        ProducerConfiguration producerConf = new ProducerConfiguration();
-        producerConf.setSendTimeout(0, TimeUnit.SECONDS);
-        producerConf.setCompressionType(arguments.compression);
-        producerConf.setMaxPendingMessages(arguments.maxOutstanding);
-        // enable round robin message routing if it is a partitioned topic
-        
producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+        PulsarClient client = clientBuilder.build();
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
+                .sendTimeout(0, TimeUnit.SECONDS) //
+                .compressionType(arguments.compression) //
+                .maxPendingMessages(arguments.maxOutstanding) //
+                // enable round robin message routing if it is a partitioned 
topic
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
         if (arguments.batchTime > 0) {
-            producerConf.setBatchingMaxPublishDelay(arguments.batchTime, 
TimeUnit.MILLISECONDS);
-            producerConf.setBatchingEnabled(true);
+            producerBuilder.batchingMaxPublishDelay(arguments.batchTime, 
TimeUnit.MILLISECONDS).enableBatching(true);
         }
 
         // Block if queue is full else we will start seeing errors in sendAsync
-        producerConf.setBlockIfQueueFull(true);
+        producerBuilder.blockIfQueueFull(true);
 
         if (arguments.encKeyName != null) {
-            producerConf.addEncryptionKey(arguments.encKeyName);
+            producerBuilder.addEncryptionKey(arguments.encKeyName);
             byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));
             EncKeyReader keyReader = new EncKeyReader(pKey);
-            producerConf.setCryptoKeyReader(keyReader);
+            producerBuilder.cryptoKeyReader(keyReader);
         }
 
         for (int i = 0; i < arguments.numTopics; i++) {
@@ -292,7 +291,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
             log.info("Adding {} publishers on topic {}", 
arguments.numProducers, topic);
 
             for (int j = 0; j < arguments.numProducers; j++) {
-                futures.add(client.createProducerAsync(topic, producerConf));
+                
futures.add(producerBuilder.clone().topic(topic).createAsync());
             }
         }
 
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index c5e66b214..7a4fdf964 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -29,14 +29,13 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
@@ -173,7 +172,7 @@ public static void main(String[] args) throws Exception {
 
         final RateLimiter limiter = arguments.rate > 0 ? 
RateLimiter.create(arguments.rate) : null;
 
-        ReaderListener listener = (reader, msg) -> {
+        ReaderListener<byte[]> listener = (reader, msg) -> {
             messagesReceived.increment();
             bytesReceived.add(msg.getData().length);
 
@@ -182,21 +181,21 @@ public static void main(String[] args) throws Exception {
             }
         };
 
-        ClientConfiguration clientConf = new ClientConfiguration();
-        clientConf.setConnectionsPerBroker(arguments.maxConnections);
-        clientConf.setStatsInterval(arguments.statsIntervalSeconds, 
TimeUnit.SECONDS);
-        clientConf.setIoThreads(Runtime.getRuntime().availableProcessors());
+        ClientBuilder clientBuilder = PulsarClient.builder() //
+                .serviceUrl(arguments.serviceURL) //
+                .connectionsPerBroker(arguments.maxConnections) //
+                .statsInterval(arguments.statsIntervalSeconds, 
TimeUnit.SECONDS) //
+                .ioThreads(Runtime.getRuntime().availableProcessors()) //
+                .enableTls(arguments.useTls) //
+                .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
+
         if (isNotBlank(arguments.authPluginClassName)) {
-            clientConf.setAuthentication(arguments.authPluginClassName, 
arguments.authParams);
+            clientBuilder.authentication(arguments.authPluginClassName, 
arguments.authParams);
         }
-        clientConf.setUseTls(arguments.useTls);
-        clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
-        PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, 
clientConf);
+
+        PulsarClient pulsarClient = clientBuilder.build();
 
         List<CompletableFuture<Reader<byte[]>>> futures = Lists.newArrayList();
-        ReaderConfiguration readerConfig = new ReaderConfiguration();
-        readerConfig.setReaderListener(listener);
-        readerConfig.setReceiverQueueSize(arguments.receiverQueueSize);
 
         MessageId startMessageId;
         if ("earliest".equals(arguments.startMessageId)) {
@@ -208,11 +207,16 @@ public static void main(String[] args) throws Exception {
             startMessageId = new MessageIdImpl(Long.parseLong(parts[0]), 
Long.parseLong(parts[1]), -1);
         }
 
+        ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader() //
+                .readerListener(listener) //
+                .receiverQueueSize(arguments.receiverQueueSize) //
+                .startMessageId(startMessageId);
+
         for (int i = 0; i < arguments.numTopics; i++) {
             final TopicName topicName = (arguments.numTopics == 1) ? 
prefixTopicName
                     : TopicName.get(String.format("%s-%d", prefixTopicName, 
i));
 
-            futures.add(pulsarClient.createReaderAsync(topicName.toString(), 
startMessageId, readerConfig));
+            
futures.add(readerBuilder.clone().topic(topicName.toString()).createAsync());
         }
 
         FutureUtil.waitForAll(futures).get();
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 6573477d4..b917dc244 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -36,7 +36,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -176,30 +176,33 @@ public synchronized PulsarClient getPulsarClient() throws 
IOException {
     }
 
     private PulsarClient createClientInstance(ClusterData clusterData) throws 
IOException {
-        ClientConfiguration clientConf = new ClientConfiguration();
-        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
-        clientConf.setUseTls(config.isTlsEnabled());
-        
clientConf.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
-        
clientConf.setTlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath());
-        clientConf.setIoThreads(config.getWebSocketNumIoThreads());
-        
clientConf.setConnectionsPerBroker(config.getWebSocketConnectionsPerBroker());
+        ClientBuilder clientBuilder = PulsarClient.builder() //
+                .statsInterval(0, TimeUnit.SECONDS) //
+                .enableTls(config.isTlsEnabled()) //
+                
.allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //
+                
.tlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath()) //
+                .ioThreads(config.getWebSocketNumIoThreads()) //
+                
.connectionsPerBroker(config.getWebSocketConnectionsPerBroker());
 
         if (isNotBlank(config.getBrokerClientAuthenticationPlugin())
                 && 
isNotBlank(config.getBrokerClientAuthenticationParameters())) {
-            
clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
+            
clientBuilder.authentication(config.getBrokerClientAuthenticationPlugin(),
                     config.getBrokerClientAuthenticationParameters());
         }
 
         if (config.isTlsEnabled()) {
             if (isNotBlank(clusterData.getBrokerServiceUrlTls())) {
-                return 
PulsarClient.create(clusterData.getBrokerServiceUrlTls(), clientConf);
+                clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls());
             } else if (isNotBlank(clusterData.getServiceUrlTls())) {
-                return PulsarClient.create(clusterData.getServiceUrlTls(), 
clientConf);
+                clientBuilder.serviceUrl(clusterData.getServiceUrlTls());
             }
         } else if (isNotBlank(clusterData.getBrokerServiceUrl())) {
-            return PulsarClient.create(clusterData.getBrokerServiceUrl(), 
clientConf);
+            clientBuilder.serviceUrl(clusterData.getBrokerServiceUrl());
+        } else {
+            clientBuilder.serviceUrl(clusterData.getServiceUrl());
         }
-        return PulsarClient.create(clusterData.getServiceUrl(), clientConf);
+
+        return clientBuilder.build();
     }
 
     private static ClusterData createClusterData(WebSocketProxyConfiguration 
config) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to