This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cb5eb5b  PIP-12 Introduce builder for creating Producer Consumer 
Reader (#1089)
cb5eb5b is described below

commit cb5eb5b671316bf200202b5b410b1438f2cde98b
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sat Feb 17 12:22:47 2018 -0800

    PIP-12 Introduce builder for creating Producer Consumer Reader (#1089)
    
    * PIP-12: Introduce builders for creating Producer Consumer Reader
    
    * Fixed Javadocs
    
    * Addressed comments
---
 .../pulsar/broker/service/AbstractReplicator.java  |  21 +-
 .../nonpersistent/NonPersistentReplicator.java     |   2 +-
 .../pulsar/broker/service/PersistentTopicTest.java |   6 +-
 .../apache/pulsar/client/api/ClientBuilder.java    | 265 ++++++++++++++++++++
 .../pulsar/client/api/ClientConfiguration.java     |  10 +-
 .../apache/pulsar/client/api/ConsumerBuilder.java  | 245 +++++++++++++++++++
 .../pulsar/client/api/ConsumerConfiguration.java   |   3 +-
 .../apache/pulsar/client/api/CryptoKeyReader.java  |  39 ++-
 .../apache/pulsar/client/api/MessageRouter.java    |   3 +-
 .../pulsar/client/api/MessageRoutingMode.java}     |  22 +-
 .../apache/pulsar/client/api/ProducerBuilder.java  | 272 +++++++++++++++++++++
 .../pulsar/client/api/ProducerConfiguration.java   |  23 +-
 .../org/apache/pulsar/client/api/PulsarClient.java |  81 +++++-
 .../apache/pulsar/client/api/ReaderBuilder.java    | 140 +++++++++++
 .../pulsar/client/api/ReaderConfiguration.java     |  10 +-
 .../pulsar/client/impl/ClientBuilderImpl.java      | 154 ++++++++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 176 +++++++++++++
 .../pulsar/client/impl/ProducerBuilderImpl.java    | 194 +++++++++++++++
 .../pulsar/client/impl/PulsarClientImpl.java       |  23 +-
 .../pulsar/client/impl/ReaderBuilderImpl.java      | 132 ++++++++++
 .../apache/pulsar/client/api/MessageIdTest.java    |  16 +-
 .../MessageIdTest.java => impl/BuildersTest.java}  |  44 ++--
 .../pulsar/client/tutorial/SampleProducer.java     |   6 +-
 23 files changed, 1769 insertions(+), 118 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 1913dd5..49213c9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -22,10 +22,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.service.AbstractReplicator.State;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -43,7 +43,7 @@ public abstract class AbstractReplicator {
     protected volatile ProducerImpl producer;
 
     protected final int producerQueueSize;
-    protected final ProducerConfiguration producerConfiguration;
+    protected final ProducerBuilder producerBuilder;
 
     protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 
1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS);
 
@@ -68,10 +68,11 @@ public abstract class AbstractReplicator {
         this.producer = null;
         this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
 
-        this.producerConfiguration = new ProducerConfiguration();
-        this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
-        this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
-        
this.producerConfiguration.setProducerName(getReplicatorName(replicatorPrefix, 
localCluster));
+        this.producerBuilder = client.newProducer() //
+                .topic(topicName)
+                .sendTimeout(0, TimeUnit.SECONDS) //
+                .maxPendingMessages(producerQueueSize) //
+                .producerName(getReplicatorName(replicatorPrefix, 
localCluster));
         STATE_UPDATER.set(this, State.Stopped);
     }
 
@@ -83,10 +84,6 @@ public abstract class AbstractReplicator {
 
     protected abstract void disableReplicatorRead();
 
-    public ProducerConfiguration getProducerConfiguration() {
-        return producerConfiguration;
-    }
-
     public String getRemoteCluster() {
         return remoteCluster;
     }
@@ -121,7 +118,7 @@ public abstract class AbstractReplicator {
         }
 
         log.info("[{}][{} -> {}] Starting replicator", topicName, 
localCluster, remoteCluster);
-        client.createProducerAsync(topicName, 
producerConfiguration).thenAccept(producer -> {
+        producerBuilder.createAsync().thenAccept(producer -> {
             readEntries(producer);
         }).exceptionally(ex -> {
             if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Stopped)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index d914aa7..e9219a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -52,7 +52,7 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
             BrokerService brokerService) {
         super(topic.getName(), topic.replicatorPrefix, localCluster, 
remoteCluster, brokerService);
 
-        producerConfiguration.setBlockIfQueueFull(false);
+        producerBuilder.blockIfQueueFull(false);
 
         startProducer();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 55019a0..a8ec940 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1149,17 +1149,17 @@ public class PersistentTopicTest {
         brokerService.getReplicationClients().put(remoteCluster, client);
         PersistentReplicator replicator = new PersistentReplicator(topic, 
cursor, localCluster, remoteCluster, brokerService);
 
-        doReturn(new 
CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName,
 replicator.getProducerConfiguration());
+        doReturn(new 
CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(matches(globalTopicName),
 any());
 
         replicator.startProducer();
-        verify(clientImpl).createProducerAsync(globalTopicName, 
replicator.getProducerConfiguration());
+        verify(clientImpl).createProducerAsync(matches(globalTopicName), 
any());
 
         replicator.disconnect(false);
         replicator.disconnect(false);
 
         replicator.startProducer();
 
-        verify(clientImpl, 
Mockito.times(2)).createProducerAsync(globalTopicName, 
replicator.getProducerConfiguration());
+        verify(clientImpl, 
Mockito.times(2)).createProducerAsync(matches(globalTopicName), any());
     }
 
     @Test
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
new file mode 100644
index 0000000..b832e59
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+
+/**
+ * Builder interface that is used to construct a {@link PulsarClient} instance.
+ *
+ * @since 2.0.0
+ */
+public interface ClientBuilder extends Serializable, Cloneable {
+
+    /**
+     * @return the new {@link PulsarClient} instance
+     */
+    PulsarClient build() throws PulsarClientException;
+
+    /**
+     * Create a copy of the current client builder.
+     * <p>
+     * Cloning the builder can be used to share an incomplete configuration 
and specialize it multiple times. For
+     * example:
+     *
+     * <pre>
+     * ClientBuilder builder = 
PulsarClient.builder().ioThreads(8).listenerThreads(4);
+     *
+     * PulsarClient client1 = builder.clone().serviceUrl(URL_1).build();
+     * PulsarClient client2 = builder.clone().serviceUrl(URL_2).build();
+     * </pre>
+     */
+    ClientBuilder clone();
+
+    /**
+     * Configure the service URL for the Pulsar service.
+     * <p>
+     * This parameter is required
+     *
+     * @param serviceUrl
+     * @return
+     */
+    ClientBuilder serviceUrl(String serviceUrl);
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * String AUTH_CLASS = 
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
+     *
+     * Map<String, String> conf = new TreeMap<>();
+     * conf.put("tlsCertFile", "/my/cert/file");
+     * conf.put("tlsKeyFile", "/my/key/file");
+     *
+     * Authentication auth = AuthenticationFactor.create(AUTH_CLASS, conf);
+     *
+     * PulsarClient client = PulsarClient.builder()
+     *          .serviceUrl(SERVICE_URL)
+     *          .authentication(auth)
+     *          .build();
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authentication
+     *            an instance of the {@link Authentication} provider already 
constructed
+     */
+    ClientBuilder authentication(Authentication authentication);
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * String AUTH_CLASS = 
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
+     * String AUTH_PARAMS = 
"tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file";
+     *
+     * PulsarClient client = PulsarClient.builder()
+     *          .serviceUrl(SERVICE_URL)
+     *          .authentication(AUTH_CLASS, AUTH_PARAMS)
+     *          .build();
+     * ....
+     * </code>
+     * </pre>
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParamsString
+     *            string which represents parameters for the 
Authentication-Plugin, e.g., "key1:val1,key2:val2"
+     * @throws UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    ClientBuilder authentication(String authPluginClassName, String 
authParamsString)
+            throws UnsupportedAuthenticationException;
+
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     * <p>
+     * Example:
+     * <p>
+     *
+     * <pre>
+     * <code>
+     * String AUTH_CLASS = 
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
+     *
+     * Map<String, String> conf = new TreeMap<>();
+     * conf.put("tlsCertFile", "/my/cert/file");
+     * conf.put("tlsKeyFile", "/my/key/file");
+     *
+     * PulsarClient client = PulsarClient.builder()
+     *          .serviceUrl(SERVICE_URL)
+     *          .authentication(AUTH_CLASS, conf)
+     *          .build();
+     * ....
+     * </code>
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParams
+     *            map which represents parameters for the Authentication-Plugin
+     * @throws UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    ClientBuilder authentication(String authPluginClassName, Map<String, 
String> authParams)
+            throws UnsupportedAuthenticationException;
+
+    /**
+     * Set the operation timeout <i>(default: 30 seconds)</i>
+     * <p>
+     * Producer-create, subscribe and unsubscribe operations will be retried 
until this interval, after which the
+     * operation will be maked as failed
+     *
+     * @param operationTimeout
+     *            operation timeout
+     * @param unit
+     *            time unit for {@code operationTimeout}
+     */
+    ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);
+
+    /**
+     * Set the number of threads to be used for handling connections to 
brokers <i>(default: 1 thread)</i>
+     *
+     * @param numIoThreads
+     */
+    ClientBuilder ioThreads(int numIoThreads);
+
+    /**
+     * Set the number of threads to be used for message listeners <i>(default: 
1 thread)</i>
+     *
+     * @param numListenerThreads
+     */
+    ClientBuilder listenerThreads(int numListenerThreads);
+
+    /**
+     * Sets the max number of connection that the client library will open to 
a single broker.
+     * <p>
+     * By default, the connection pool will use a single connection for all 
the producers and consumers. Increasing this
+     * parameter may improve throughput when using many producers over a high 
latency connection.
+     * <p>
+     *
+     * @param connectionsPerBroker
+     *            max number of connections per broker (needs to be greater 
than 0)
+     */
+    ClientBuilder connectionsPerBroker(int connectionsPerBroker);
+
+    /**
+     * Configure whether to use TCP no-delay flag on the connection, to 
disable Nagle algorithm.
+     * <p>
+     * No-delay features make sure packets are sent out on the network as soon 
as possible, and it's critical to achieve
+     * low latency publishes. On the other hand, sending out a huge number of 
small packets might limit the overall
+     * throughput, so if latency is not a concern, it's advisable to set the 
<code>useTcpNoDelay</code> flag to false.
+     * <p>
+     * Default value is true
+     *
+     * @param enableTcpNoDelay
+     */
+    ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);
+
+    /**
+     * Configure whether to use TLS encryption on the connection <i>(default: 
false)</i>
+     *
+     * @param enableTls
+     */
+    ClientBuilder enableTls(boolean enableTls);
+
+    /**
+     * Set the path to the trusted TLS certificate file
+     *
+     * @param tlsTrustCertsFilePath
+     */
+    ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath);
+
+    /**
+     * Configure whether the Pulsar client accept untrusted TLS certificate 
from broker <i>(default: false)</i>
+     *
+     * @param allowTlsInsecureConnection
+     */
+    ClientBuilder allowTlsInsecureConnection(boolean 
allowTlsInsecureConnection);
+
+    /**
+     * It allows to validate hostname verification when client connects to 
broker over tls. It validates incoming x509
+     * certificate and matches provided hostname(CN/SAN) with expected 
broker's host name. It follows RFC 2818, 3.1.
+     * Server Identity hostname verification.
+     *
+     * @see <a href="https://tools.ietf.org/html/rfc2818";>rfc2818</a>
+     *
+     * @param enableTlsHostnameVerification
+     */
+    ClientBuilder enableTlsHostnameVerification(boolean 
enableTlsHostnameVerification);
+
+    /**
+     * Set the interval between each stat info <i>(default: 60 seconds)</i> 
Stats will be activated with positive
+     * statsIntervalSeconds It should be set to at least 1 second
+     *
+     * @param statsIntervalSeconds
+     *            the interval between each stat info
+     * @param unit
+     *            time unit for {@code statsInterval}
+     */
+    ClientBuilder statsInterval(long statsInterval, TimeUnit unit);
+
+    /**
+     * Number of concurrent lookup-requests allowed on each broker-connection 
to prevent overload on broker.
+     * <i>(default: 5000)</i> It should be configured with higher value only 
in case of it requires to produce/subscribe
+     * on thousands of topic using created {@link PulsarClient}
+     *
+     * @param maxConcurrentLookupRequests
+     */
+    ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
+
+    /**
+     * Set max number of broker-rejected requests in a certain time-frame (30 
seconds) after which current connection
+     * will be closed and client creates a new connection that give chance to 
connect a different broker <i>(default:
+     * 50)</i>
+     *
+     * @param maxNumberOfRejectedRequestPerConnection
+     */
+    ClientBuilder maxNumberOfRejectedRequestPerConnection(int 
maxNumberOfRejectedRequestPerConnection);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index 9ab7b32..14f94da 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -30,14 +30,13 @@ import 
org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 /**
  * Class used to specify client side configuration like authentication, etc..
  *
- *
+ * @deprecated Use {@link PulsarClient#builder()} to construct and configure a 
new {@link PulsarClient} instance
  */
+@Deprecated
 public class ClientConfiguration implements Serializable {
 
-    /**
-     *
-     */
     private static final long serialVersionUID = 1L;
+
     private Authentication authentication = new AuthenticationDisabled();
     private long operationTimeoutMs = 30000;
     private long statsIntervalSeconds = 60;
@@ -221,8 +220,7 @@ public class ClientConfiguration implements Serializable {
      *            max number of connections per broker (needs to be greater 
than 0)
      */
     public void setConnectionsPerBroker(int connectionsPerBroker) {
-        checkArgument(connectionsPerBroker > 0,
-                "Connections per broker need to be greater than 0");
+        checkArgument(connectionsPerBroker > 0, "Connections per broker need 
to be greater than 0");
         this.connectionsPerBroker = connectionsPerBroker;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
new file mode 100644
index 0000000..a2c3c81
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link ConsumerBuilder} is used to configure and create instances of {@link 
Consumer}.
+ *
+ * @see PulsarClient#newConsumer()
+ *
+ * @since 2.0.0
+ */
+public interface ConsumerBuilder extends Serializable, Cloneable {
+
+    /**
+     * Create a copy of the current consumer builder.
+     * <p>
+     * Cloning the builder can be used to share an incomplete configuration 
and specialize it multiple times. For
+     * example:
+     *
+     * <pre>
+     * ConsumerBuilder builder = client.newConsumer() //
+     *         .subscriptionName("my-subscription-name") //
+     *         .subscriptionType(SubscriptionType.Shared) //
+     *         .receiverQueueSize(10);
+     *
+     * Consumer consumer1 = builder.clone().topic(TOPIC_1).subscribe();
+     * Consumer consumer2 = builder.clone().topic(TOPIC_2).subscribe();
+     * </pre>
+     */
+    ConsumerBuilder clone();
+
+    /**
+     * Finalize the {@link Consumer} creation by subscribing to the topic.
+     *
+     * <p>
+     * If the subscription does not exist, a new subscription will be created 
and all messages published after the
+     * creation will be retained until acknowledged, even if the consumer is 
not connected.
+     *
+     * @return the {@link Consumer} instance
+     * @throws PulsarClientException
+     *             if the the subscribe operation fails
+     */
+    Consumer subscribe() throws PulsarClientException;
+
+    /**
+     * Finalize the {@link Consumer} creation by subscribing to the topic in 
asynchronous mode.
+     *
+     * <p>
+     * If the subscription does not exist, a new subscription will be created 
and all messages published after the
+     * creation will be retained until acknowledged, even if the consumer is 
not connected.
+     *
+     * @return a future that will yield a {@link Consumer} instance
+     * @throws PulsarClientException
+     *             if the the subscribe operation fails
+     */
+    CompletableFuture<Consumer> subscribeAsync();
+
+    /**
+     * Specify the topic this consumer will subscribe on.
+     * <p>
+     * This argument is required when constructing the consumer.
+     *
+     * @param topicName
+     */
+    ConsumerBuilder topic(String topicName);
+
+    /**
+     * Specify the subscription name for this consumer.
+     * <p>
+     * This argument is required when constructing the consumer.
+     *
+     * @param subscriptionName
+     */
+    ConsumerBuilder subscriptionName(String subscriptionName);
+
+    /**
+     * Set the timeout for unacked messages, truncated to the nearest 
millisecond. The timeout needs to be greater than
+     * 10 seconds.
+     *
+     * @param ackTimeout
+     *            for unacked messages.
+     * @param timeUnit
+     *            unit in which the timeout is provided.
+     * @return {@link ConsumerConfiguration}
+     */
+    ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit);
+
+    /**
+     * Select the subscription type to be used when subscribing to the topic.
+     * <p>
+     * Default is {@link SubscriptionType#Exclusive}
+     *
+     * @param subscriptionType
+     *            the subscription type value
+     */
+    ConsumerBuilder subscriptionType(SubscriptionType subscriptionType);
+
+    /**
+     * Sets a {@link MessageListener} for the consumer
+     * <p>
+     * When a {@link MessageListener} is set, application will receive 
messages through it. Calls to
+     * {@link Consumer#receive()} will not be allowed.
+     *
+     * @param messageListener
+     *            the listener object
+     */
+    ConsumerBuilder messageListener(MessageListener messageListener);
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    ConsumerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
+
+    /**
+     * Sets the ConsumerCryptoFailureAction to the value specified
+     *
+     * @param action
+     *            The consumer action
+     */
+    ConsumerBuilder cryptoFailureAction(ConsumerCryptoFailureAction action);
+
+    /**
+     * Sets the size of the consumer receive queue.
+     * <p>
+     * The consumer receive queue controls how many messages can be 
accumulated by the {@link Consumer} before the
+     * application calls {@link Consumer#receive()}. Using a higher value 
could potentially increase the consumer
+     * throughput at the expense of bigger memory utilization.
+     * </p>
+     * <p>
+     * <b>Setting the consumer queue size as zero</b>
+     * <ul>
+     * <li>Decreases the throughput of the consumer, by disabling pre-fetching 
of messages. This approach improves the
+     * message distribution on shared subscription, by pushing messages only 
to the consumers that are ready to process
+     * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned 
Topics can be used if the consumer queue
+     * size is zero. {@link Consumer#receive()} function call should not be 
interrupted when the consumer queue size is
+     * zero.</li>
+     * <li>Doesn't support Batch-Message: if consumer receives any 
batch-message then it closes consumer connection with
+     * broker and {@link Consumer#receive()} call will remain blocked while 
{@link Consumer#receiveAsync()} receives
+     * exception in callback. <b> consumer will not be able receive any 
further message unless batch-message in pipeline
+     * is removed</b></li>
+     * </ul>
+     * </p>
+     * Default value is {@code 1000} messages and should be good for most use 
cases.
+     *
+     * @param receiverQueueSize
+     *            the new receiver queue size value
+     */
+    ConsumerBuilder receiverQueueSize(int receiverQueueSize);
+
+    /**
+     * Set the max total receiver queue size across partitons.
+     * <p>
+     * This setting will be used to reduce the receiver queue size for 
individual partitions
+     * {@link #receiverQueueSize(int)} if the total exceeds this value 
(default: 50000).
+     *
+     * @param maxTotalReceiverQueueSizeAcrossPartitions
+     */
+    ConsumerBuilder maxTotalReceiverQueueSizeAcrossPartitions(int 
maxTotalReceiverQueueSizeAcrossPartitions);
+
+    /**
+     * Set the consumer name.
+     *
+     * @param consumerName
+     */
+    ConsumerBuilder consumerName(String consumerName);
+
+    /**
+     * If enabled, the consumer will read messages from the compacted topic 
rather than reading the full message backlog
+     * of the topic. This means that, if the topic has been compacted, the 
consumer will only see the latest value for
+     * each key in the topic, up until the point in the topic message backlog 
that has been compacted. Beyond that
+     * point, the messages will be sent as normal.
+     *
+     * readCompacted can only be enabled subscriptions to persistent topics, 
which have a single active consumer (i.e.
+     * failure or exclusive subscriptions). Attempting to enable it on 
subscriptions to a non-persistent topics or on a
+     * shared subscription, will lead to the subscription call throwing a 
PulsarClientException.
+     *
+     * @param readCompacted
+     *            whether to read from the compacted topic
+     */
+    ConsumerBuilder readCompacted(boolean readCompacted);
+
+    /**
+     * Sets priority level for the shared subscription consumers to which 
broker gives more priority while dispatching
+     * messages. Here, broker follows descending priorities. (eg: 
0=max-priority, 1, 2,..) </br>
+     * In Shared subscription mode, broker will first dispatch messages to max 
priority-level consumers if they have
+     * permits, else broker will consider next priority level consumers. </br>
+     * If subscription has consumer-A with priorityLevel 0 and Consumer-B with 
priorityLevel 1 then broker will dispatch
+     * messages to only consumer-A until it runs out permit and then broker 
starts dispatching messages to Consumer-B.
+     *
+     * <pre>
+     * Consumer PriorityLevel Permits
+     * C1       0             2
+     * C2       0             1
+     * C3       0             1
+     * C4       1             2
+     * C5       1             1
+     * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, 
C4, C5, C4
+     * </pre>
+     *
+     * @param priorityLevel
+     */
+    ConsumerBuilder priorityLevel(int priorityLevel);
+
+    /**
+     * Set a name/value property with this consumer.
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    ConsumerBuilder property(String key, String value);
+
+    /**
+     * Add all the properties in the provided map
+     *
+     * @param properties
+     * @return
+     */
+    ConsumerBuilder properties(Map<String, String> properties);
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 00e4537..5f25fa8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -31,8 +31,9 @@ import java.util.concurrent.TimeUnit;
  * attach to the subscription. Other consumers will get an error message. In 
Shared subscription, multiple consumers
  * will be able to use the same subscription name and the messages will be 
dispatched in a round robin fashion.
  *
- *
+ * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a 
{@link Consumer} instance
  */
+@Deprecated
 public class ConsumerConfiguration implements Serializable {
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
index 46496b7..4acb6e9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
@@ -18,34 +18,33 @@
  */
 package org.apache.pulsar.client.api;
 
-import java.util.List;
 import java.util.Map;
 
 public interface CryptoKeyReader {
 
-    /*
+    /**
      * Return the encryption key corresponding to the key name in the argument
      * <p>
-     * This method should be implemented to return the EncryptionKeyInfo. This 
method will be
-     * called at the time of producer creation as well as consumer receiving 
messages.
-     * Hence, application should not make any blocking calls within the 
implementation. 
+     * This method should be implemented to return the EncryptionKeyInfo. This 
method will be called at the time of
+     * producer creation as well as consumer receiving messages. Hence, 
application should not make any blocking calls
+     * within the implementation.
      * <p>
-     * 
-    * @param keyName
-    *            Unique name to identify the key
-    * @param metadata
-    *            Additional information needed to identify the key
-    * @return EncryptionKeyInfo with details about the public key
-    * */
+     *
+     * @param keyName
+     *            Unique name to identify the key
+     * @param metadata
+     *            Additional information needed to identify the key
+     * @return EncryptionKeyInfo with details about the public key
+     */
     EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
metadata);
 
-    /*
-    * @param keyName
-    *            Unique name to identify the key
-    * @param metadata
-    *            Additional information needed to identify the key
-    * @return byte array of the private key value
-    */
+    /**
+     * @param keyName
+     *            Unique name to identify the key
+     * @param metadata
+     *            Additional information needed to identify the key
+     * @return byte array of the private key value
+     */
     EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
metadata);
-    
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
index 25c8975..a9cef6f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 public interface MessageRouter extends Serializable {
 
     /**
-     * 
+     *
      * @param msg
      *            Message object
      * @return The index of the partition to use for the message
@@ -42,7 +42,6 @@ public interface MessageRouter extends Serializable {
      * @return the partition to route the message.
      * @since 1.22.0
      */
-    @SuppressWarnings("deprecation")
     default int choosePartition(Message msg, TopicMetadata metadata) {
         return choosePartition(msg);
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
similarity index 53%
copy from 
pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
index 43b7b26..1d45489 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
@@ -16,24 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.tutorial;
+package org.apache.pulsar.client.api;
 
-import java.io.IOException;
-
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-public class SampleProducer {
-    public static void main(String[] args) throws PulsarClientException, 
InterruptedException, IOException {
-        PulsarClient pulsarClient = 
PulsarClient.create("http://127.0.0.1:8080";);
-
-        Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic");
-
-        for (int i = 0; i < 10; i++) {
-            producer.send("my-message".getBytes());
-        }
-
-        pulsarClient.close();
-    }
+public enum MessageRoutingMode {
+    SinglePartition, RoundRobinPartition, CustomPartition
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
new file mode 100644
index 0000000..5ab1215
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
+
+/**
+ * {@link ProducerBuilder} is used to configure and create instances of {@link 
Producer}.
+ *
+ * @see PulsarClient#newProducer()
+ */
+public interface ProducerBuilder extends Serializable, Cloneable {
+
+    /**
+     * Finalize the creation of the {@link Producer} instance.
+     * <p>
+     * This method will block until the producer is created successfully.
+     *
+     * @return the producer instance
+     * @throws PulsarClientException.ProducerBusyException
+     *             if a producer with the same "producer name" is already 
connected to the topic
+     * @throws PulsarClientException
+     *             if the producer creation fails
+     */
+    Producer create() throws PulsarClientException;
+
+    /**
+     * Finalize the creation of the {@link Producer} instance in asynchronous 
mode.
+     * <p>
+     * This method will return a {@link CompletableFuture} that can be used to 
access the instance when it's ready.
+     *
+     * @return a future that will yield the created producer instance
+     * @throws PulsarClientException.ProducerBusyException
+     *             if a producer with the same "producer name" is already 
connected to the topic
+     * @throws PulsarClientException
+     *             if the producer creation fails
+     */
+    CompletableFuture<Producer> createAsync();
+
+    /**
+     * Create a copy of the current {@link ProducerBuilder}.
+     * <p>
+     * Cloning the builder can be used to share an incomplete configuration 
and specialize it multiple times. For
+     * example:
+     *
+     * <pre>
+     * ProducerBuilder builder = client.newProducer().sendTimeout(10, 
TimeUnit.SECONDS).blockIfQueueFull(true);
+     *
+     * Producer producer1 = builder.clone().topic(TOPIC_1).create();
+     * Producer producer2 = builder.clone().topic(TOPIC_2).create();
+     * </pre>
+     */
+    ProducerBuilder clone();
+
+    /**
+     * Specify the topic this producer will be publishing on.
+     * <p>
+     * This argument is required when constructing the produce.
+     *
+     * @param topicName
+     */
+    ProducerBuilder topic(String topicName);
+
+    /**
+     * Specify a name for the producer
+     * <p>
+     * If not assigned, the system will generate a globally unique name which 
can be access with
+     * {@link Producer#getProducerName()}.
+     * <p>
+     * When specifying a name, it is up to the user to ensure that, for a 
given topic, the producer name is unique
+     * across all Pulsar's clusters. Brokers will enforce that only a single 
producer a given name can be publishing on
+     * a topic.
+     *
+     * @param producerName
+     *            the custom name to use for the producer
+     */
+    ProducerBuilder producerName(String producerName);
+
+    /**
+     * Set the send timeout <i>(default: 30 seconds)</i>
+     * <p>
+     * If a message is not acknowledged by the server before the sendTimeout 
expires, an error will be reported.
+     *
+     * @param sendTimeout
+     *            the send timeout
+     * @param unit
+     *            the time unit of the {@code sendTimeout}
+     */
+    ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit);
+
+    /**
+     * Set the max size of the queue holding the messages pending to receive 
an acknowledgment from the broker.
+     * <p>
+     * When the queue is full, by default, all calls to {@link Producer#send} 
and {@link Producer#sendAsync} will fail
+     * unless blockIfQueueFull is set to true. Use {@link 
#setBlockIfQueueFull} to change the blocking behavior.
+     *
+     * @param maxPendingMessages
+     * @return
+     */
+    ProducerBuilder maxPendingMessages(int maxPendingMessages);
+
+    /**
+     * Set the number of max pending messages across all the partitions
+     * <p>
+     * This setting will be used to lower the max pending messages for each 
partition
+     * ({@link #maxPendingMessages(int)}), if the total exceeds the configured 
value.
+     *
+     * @param maxPendingMessagesAcrossPartitions
+     */
+    ProducerBuilder maxPendingMessagesAcrossPartitions(int 
maxPendingMessagesAcrossPartitions);
+
+    /**
+     * Set whether the {@link Producer#send} and {@link Producer#sendAsync} 
operations should block when the outgoing
+     * message queue is full.
+     * <p>
+     * Default is <code>false</code>. If set to <code>false</code>, send 
operations will immediately fail with
+     * {@link ProducerQueueIsFullError} when there is no space left in pending 
queue.
+     *
+     * @param blockIfQueueFull
+     *            whether to block {@link Producer#send} and {@link 
Producer#sendAsync} operations on queue full
+     * @return
+     */
+    ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull);
+
+    /**
+     * Set the message routing mode for the partitioned producer
+     *
+     * @param mode
+     * @return
+     */
+    ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode);
+
+    /**
+     * Set the compression type for the producer.
+     * <p>
+     * By default, message payloads are not compressed. Supported compression 
types are:
+     * <ul>
+     * <li><code>CompressionType.LZ4</code></li>
+     * <li><code>CompressionType.ZLIB</code></li>
+     * </ul>
+     *
+     * @param compressionType
+     * @return
+     */
+    ProducerBuilder compressionType(CompressionType compressionType);
+
+    /**
+     * Set a custom message routing policy by passing an implementation of 
MessageRouter
+     *
+     *
+     * @param messageRouter
+     */
+    ProducerBuilder messageRouter(MessageRouter messageRouter);
+
+    /**
+     * Control whether automatic batching of messages is enabled for the 
producer. <i>default: false [No batching]</i>
+     *
+     * When batching is enabled, multiple calls to Producer.sendAsync can 
result in a single batch to be sent to the
+     * broker, leading to better throughput, especially when publishing small 
messages. If compression is enabled,
+     * messages will be compressed at the batch level, leading to a much 
better compression ratio for similar headers or
+     * contents.
+     *
+     * When enabled default batch delay is set to 10 ms and default batch size 
is 1000 messages
+     *
+     * @see #batchingMaxPublishDelay(long, TimeUnit)
+     */
+    ProducerBuilder enableBatching(boolean enableBatching);
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
+
+    /**
+     * Add public encryption key, used by producer to encrypt the data key.
+     *
+     * At the time of producer creation, Pulsar client checks if there are 
keys added to encryptionKeys. If keys are
+     * found, a callback getKey(String keyName) is invoked against each key to 
load the values of the key. Application
+     * should implement this callback to return the key in pkcs8 format. If 
compression is enabled, message is encrypted
+     * after compression. If batch messaging is enabled, the batched message 
is encrypted.
+     *
+     */
+    ProducerBuilder addEncryptionKey(String key);
+
+    /**
+     * Sets the ProducerCryptoFailureAction to the value specified
+     *
+     * @param The
+     *            producer action
+     */
+    ProducerBuilder cryptoFailureAction(ProducerCryptoFailureAction action);
+
+    /**
+     * Set the time period within which the messages sent will be batched 
<i>default: 10ms</i> if batch messages are
+     * enabled. If set to a non zero value, messages will be queued until this 
time interval or until
+     *
+     * @see ProducerConfiguration#batchingMaxMessages threshold is reached; 
all messages will be published as a single
+     *      batch message. The consumer will be delivered individual messages 
in the batch in the same order they were
+     *      enqueued
+     * @param batchDelay
+     *            the batch delay
+     * @param timeUnit
+     *            the time unit of the {@code batchDelay}
+     * @return
+     */
+    ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit 
timeUnit);
+
+    /**
+     * Set the maximum number of messages permitted in a batch. <i>default: 
1000</i> If set to a value greater than 1,
+     * messages will be queued until this threshold is reached or batch 
interval has elapsed
+     *
+     * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) 
All messages in batch will be published as
+     *      a single batch message. The consumer will be delivered individual 
messages in the batch in the same order
+     *      they were enqueued
+     * @param batchMessagesMaxMessagesPerBatch
+     *            maximum number of messages in a batch
+     * @return
+     */
+    ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);
+
+    /**
+     * Set the baseline for the sequence ids for messages published by the 
producer.
+     * <p>
+     * First message will be using (initialSequenceId + 1) as its sequence id 
and subsequent messages will be assigned
+     * incremental sequence ids, if not otherwise specified.
+     *
+     * @param initialSequenceId
+     * @return
+     */
+    ProducerBuilder initialSequenceId(long initialSequenceId);
+
+    /**
+     * Set a name/value property with this producer.
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    ProducerBuilder property(String key, String value);
+
+    /**
+     * Add all the properties in the provided map
+     *
+     * @param properties
+     * @return
+     */
+    ProducerBuilder properties(Map<String, String> properties);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index a6d2a55..be72fc7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -28,7 +28,6 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
-import 
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 
 import com.google.common.base.Objects;
@@ -36,7 +35,9 @@ import com.google.common.base.Objects;
 /**
  * Producer's configuration
  *
+ * @deprecated use {@link PulsarClient#newProducer()} to construct and 
configure a {@link Producer} instance
  */
+@Deprecated
 public class ProducerConfiguration implements Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -268,7 +269,7 @@ public class ProducerConfiguration implements Serializable {
      *
      * @return message router.
      * @deprecated since 1.22.0-incubating. <tt>numPartitions</tt> is already 
passed as parameter in
-     * {@link MessageRouter#choosePartition(Message, TopicMetadata)}.
+     *             {@link MessageRouter#choosePartition(Message, 
TopicMetadata)}.
      * @see MessageRouter
      */
     @Deprecated
@@ -338,7 +339,7 @@ public class ProducerConfiguration implements Serializable {
      * @return encryptionKeys
      *
      */
-    public  ConcurrentOpenHashSet<String> getEncryptionKeys() {
+    public ConcurrentOpenHashSet<String> getEncryptionKeys() {
         return this.encryptionKeys;
     }
 
@@ -354,16 +355,15 @@ public class ProducerConfiguration implements 
Serializable {
     /**
      * Add public encryption key, used by producer to encrypt the data key.
      *
-     * At the time of producer creation, Pulsar client checks if there are 
keys added to encryptionKeys.
-     * If keys are found, a callback getKey(String keyName) is invoked against 
each key to load
-     * the values of the key. Application should implement this callback to 
return the key in pkcs8 format.
-     * If compression is enabled, message is encrypted after compression.
-     * If batch messaging is enabled, the batched message is encrypted.
+     * At the time of producer creation, Pulsar client checks if there are 
keys added to encryptionKeys. If keys are
+     * found, a callback getKey(String keyName) is invoked against each key to 
load the values of the key. Application
+     * should implement this callback to return the key in pkcs8 format. If 
compression is enabled, message is encrypted
+     * after compression. If batch messaging is enabled, the batched message 
is encrypted.
      *
      */
     public void addEncryptionKey(String key) {
         if (this.encryptionKeys == null) {
-            this.encryptionKeys = new ConcurrentOpenHashSet<String>(16,1);
+            this.encryptionKeys = new ConcurrentOpenHashSet<String>(16, 1);
         }
         this.encryptionKeys.add(key);
     }
@@ -377,7 +377,8 @@ public class ProducerConfiguration implements Serializable {
     /**
      * Sets the ProducerCryptoFailureAction to the value specified
      *
-     * @param The producer action
+     * @param action
+     *            The producer action
      */
     public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
         cryptoFailureAction = action;
@@ -467,6 +468,7 @@ public class ProducerConfiguration implements Serializable {
 
     /**
      * Set a name/value property with this producer.
+     *
      * @param key
      * @param value
      * @return
@@ -480,6 +482,7 @@ public class ProducerConfiguration implements Serializable {
 
     /**
      * Add all the properties in the provided map
+     *
      * @param properties
      * @return
      */
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index e15db40..6ba2518 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -21,16 +21,29 @@ package org.apache.pulsar.client.api;
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 
 /**
- * Class that provides a client interface to Pulsar
- *
- *
+ * Class that provides a client interface to Pulsar.
+ * <p>
+ * Client instances are thread-safe and can be reused for managing multiple 
{@link Producer}, {@link Consumer} and
+ * {@link Reader} instances.
  */
 public interface PulsarClient extends Closeable {
 
     /**
+     * Get a new builder instance that can used to configure and build a 
{@link PulsarClient} instance.
+     *
+     * @return the {@link ClientBuilder}
+     *
+     * @since 2.0.0
+     */
+    public static ClientBuilder builder() {
+        return new ClientBuilderImpl();
+    }
+
+    /**
      * Create a new PulsarClient object using default client configuration
      *
      * @param serviceUrl
@@ -38,7 +51,9 @@ public interface PulsarClient extends Closeable {
      * @return a new pulsar client object
      * @throws PulsarClientException.InvalidServiceURL
      *             if the serviceUrl is invalid
+     * @deprecated use {@link #builder()} to construct a client instance
      */
+    @Deprecated
     public static PulsarClient create(String serviceUrl) throws 
PulsarClientException {
         return create(serviceUrl, new ClientConfiguration());
     }
@@ -53,12 +68,51 @@ public interface PulsarClient extends Closeable {
      * @return a new pulsar client object
      * @throws PulsarClientException.InvalidServiceURL
      *             if the serviceUrl is invalid
+     * @deprecated use {@link #builder()} to construct a client instance
      */
+    @Deprecated
     public static PulsarClient create(String serviceUrl, ClientConfiguration 
conf) throws PulsarClientException {
         return new PulsarClientImpl(serviceUrl, conf);
     }
 
     /**
+     * Create a producer with default for publishing on a specific topic
+     * <p>
+     * Example:
+     *
+     * <code>
+     * Producer producer = client.newProducer().topic(myTopic).create();
+     * </code>
+     *
+     *
+     * @return a {@link ProducerBuilder} object to configure and construct the 
{@link Producer} instance
+     *
+     * @since 2.0.0
+     */
+    ProducerBuilder newProducer();
+
+    /**
+     * Create a producer with default for publishing on a specific topic
+     *
+     * @return a {@link ProducerBuilder} object to configure and construct the 
{@link Producer} instance
+     *
+     * @since 2.0.0
+     */
+    ConsumerBuilder newConsumer();
+
+    /**
+     * Create a topic reader for reading messages from the specified topic.
+     * <p>
+     * The Reader provides a low-level abstraction that allows for manual 
positioning in the topic, without using a
+     * subscription. Reader can only work on non-partitioned topics.
+     *
+     * @return a {@link ReaderBuilder} that can be used to configure and 
construct a {@link Reader} instance
+     *
+     * @since 2.0.0
+     */
+    ReaderBuilder newReader();
+
+    /**
      * Create a producer with default {@link ProducerConfiguration} for 
publishing on a specific topic
      *
      * @param topic
@@ -72,7 +126,9 @@ public interface PulsarClient extends Closeable {
      *             if there was an error with the supplied credentials
      * @throws PulsarClientException.AuthorizationException
      *             if the authorization to publish on topic was denied
+     * @deprecated use {@link #newProducer()} to build a new producer
      */
+    @Deprecated
     Producer createProducer(String topic) throws PulsarClientException;
 
     /**
@@ -81,7 +137,9 @@ public interface PulsarClient extends Closeable {
      * @param topic
      *            The name of the topic where to produce
      * @return Future of the asynchronously created producer object
+     * @deprecated use {@link #newProducer()} to build a new producer
      */
+    @Deprecated
     CompletableFuture<Producer> createProducerAsync(String topic);
 
     /**
@@ -95,7 +153,9 @@ public interface PulsarClient extends Closeable {
      * @throws PulsarClientException
      *             if it was not possible to create the producer
      * @throws InterruptedException
+     * @deprecated use {@link #newProducer()} to build a new producer
      */
+    @Deprecated
     Producer createProducer(String topic, ProducerConfiguration conf) throws 
PulsarClientException;
 
     /**
@@ -106,7 +166,9 @@ public interface PulsarClient extends Closeable {
      * @param conf
      *            The {@code ProducerConfiguration} object
      * @return Future of the asynchronously created producer object
+     * @deprecated use {@link #newProducer()} to build a new producer
      */
+    @Deprecated
     CompletableFuture<Producer> createProducerAsync(String topic, 
ProducerConfiguration conf);
 
     /**
@@ -119,7 +181,10 @@ public interface PulsarClient extends Closeable {
      * @return The {@code Consumer} object
      * @throws PulsarClientException
      * @throws InterruptedException
+     *
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
      */
+    @Deprecated
     Consumer subscribe(String topic, String subscription) throws 
PulsarClientException;
 
     /**
@@ -131,7 +196,9 @@ public interface PulsarClient extends Closeable {
      * @param subscription
      *            The subscription name
      * @return Future of the {@code Consumer} object
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
      */
+    @Deprecated
     CompletableFuture<Consumer> subscribeAsync(String topic, String 
subscription);
 
     /**
@@ -145,7 +212,9 @@ public interface PulsarClient extends Closeable {
      *            The {@code ConsumerConfiguration} object
      * @return The {@code Consumer} object
      * @throws PulsarClientException
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
      */
+    @Deprecated
     Consumer subscribe(String topic, String subscription, 
ConsumerConfiguration conf) throws PulsarClientException;
 
     /**
@@ -159,7 +228,9 @@ public interface PulsarClient extends Closeable {
      * @param conf
      *            The {@code ConsumerConfiguration} object
      * @return Future of the {@code Consumer} object
+     * @deprecated Use {@link #newConsumer()} to build a new consumer
      */
+    @Deprecated
     CompletableFuture<Consumer> subscribeAsync(String topic, String 
subscription, ConsumerConfiguration conf);
 
     /**
@@ -185,7 +256,9 @@ public interface PulsarClient extends Closeable {
      * @param conf
      *            The {@code ReaderConfiguration} object
      * @return The {@code Reader} object
+     * @deprecated Use {@link #newReader()} to build a new reader
      */
+    @Deprecated
     Reader createReader(String topic, MessageId startMessageId, 
ReaderConfiguration conf) throws PulsarClientException;
 
     /**
@@ -212,7 +285,9 @@ public interface PulsarClient extends Closeable {
      * @param conf
      *            The {@code ReaderConfiguration} object
      * @return Future of the asynchronously created producer object
+     * @deprecated Use {@link #newReader()} to build a new reader
      */
+    @Deprecated
     CompletableFuture<Reader> createReaderAsync(String topic, MessageId 
startMessageId, ReaderConfiguration conf);
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
new file mode 100644
index 0000000..196af20
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link ReaderBuilder} is used to configure and create instances of {@link 
Reader}.
+ *
+ * @see PulsarClient#newReader()
+ *
+ * @since 2.0.0
+ */
+public interface ReaderBuilder extends Serializable, Cloneable {
+
+    /**
+     * Finalize the creation of the {@link Reader} instance.
+     *
+     * <p>
+     * This method will block until the reader is created successfully.
+     *
+     * @return the reader instance
+     * @throws PulsarClientException
+     *             if the reader creation fails
+     */
+    Reader create() throws PulsarClientException;
+
+    /**
+     * Finalize the creation of the {@link Reader} instance in asynchronous 
mode.
+     *
+     * <p>
+     * This method will return a {@link CompletableFuture} that can be used to 
access the instance when it's ready.
+     *
+     * @return the reader instance
+     * @throws PulsarClientException
+     *             if the reader creation fails
+     */
+    CompletableFuture<Reader> createAsync();
+
+    /**
+     * Create a copy of the current {@link ReaderBuilder}.
+     * <p>
+     * Cloning the builder can be used to share an incomplete configuration 
and specialize it multiple times. For
+     * example:
+     *
+     * <pre>
+     * ReaderBuilder builder = 
client.newReader().readerName("my-reader").receiverQueueSize(10);
+     *
+     * Reader reader1 = builder.clone().topic(TOPIC_1).create();
+     * Reader reader2 = builder.clone().topic(TOPIC_2).create();
+     * </pre>
+     */
+    ReaderBuilder clone();
+
+    /**
+     * Specify the topic this consumer will subscribe on.
+     * <p>
+     * This argument is required when constructing the consumer.
+     *
+     * @param topicName
+     */
+    ReaderBuilder topic(String topicName);
+
+    /**
+     * The initial reader positioning is done by specifying a message id. The 
options are:
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Start reading from the earliest 
message available in the topic
+     * <li><code>MessageId.latest</code> : Start reading from the end topic, 
only getting messages published after the
+     * reader was created
+     * <li><code>MessageId</code> : When passing a particular message id, the 
reader will position itself on that
+     * specific position. The first message to be read will be the message 
next to the specified messageId.
+     * </ul>
+     */
+    ReaderBuilder startMessageId(MessageId startMessageId);
+
+    /**
+     * Sets a {@link ReaderListener} for the reader
+     * <p>
+     * When a {@link ReaderListener} is set, application will receive messages 
through it. Calls to
+     * {@link Reader#readNext()} will not be allowed.
+     *
+     * @param readerListener
+     *            the listener object
+     */
+    ReaderBuilder readerListener(ReaderListener readerListener);
+
+    /**
+     * Sets a {@link CryptoKeyReader}
+     *
+     * @param cryptoKeyReader
+     *            CryptoKeyReader object
+     */
+    ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
+
+    /**
+     * Sets the ConsumerCryptoFailureAction to the value specified
+     *
+     * @param action
+     *            The action to take when the decoding fails
+     */
+    ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction action);
+
+    /**
+     * Sets the size of the consumer receive queue.
+     * <p>
+     * The consumer receive queue controls how many messages can be 
accumulated by the {@link Consumer} before the
+     * application calls {@link Consumer#receive()}. Using a higher value 
could potentially increase the consumer
+     * throughput at the expense of bigger memory utilization.
+     * </p>
+     * Default value is {@code 1000} messages and should be good for most use 
cases.
+     *
+     * @param receiverQueueSize
+     *            the new receiver queue size value
+     */
+    ReaderBuilder receiverQueueSize(int receiverQueueSize);
+
+    /**
+     * Set the reader name.
+     *
+     * @param readerName
+     */
+    ReaderBuilder readerName(String readerName);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
index 999e2e6..6f3816c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
@@ -23,6 +23,11 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.Serializable;
 
+/**
+ *
+ * @deprecated Use {@link PulsarClient#newReader()} to construct and configure 
a {@link Reader} instance
+ */
+@Deprecated
 public class ReaderConfiguration implements Serializable {
 
     private int receiverQueueSize = 1000;
@@ -84,8 +89,9 @@ public class ReaderConfiguration implements Serializable {
 
     /**
      * Sets the ConsumerCryptoFailureAction to the value specified
-     * 
-     * @param The consumer action
+     *
+     * @param action
+     *            The action to take when the decoding fails
      */
     public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
         cryptoFailureAction = action;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
new file mode 100644
index 0000000..2be5318
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+
+@SuppressWarnings("deprecation")
+public class ClientBuilderImpl implements ClientBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    String serviceUrl;
+    final ClientConfiguration conf = new ClientConfiguration();
+
+    @Override
+    public PulsarClient build() throws PulsarClientException {
+        if (serviceUrl == null) {
+            throw new IllegalArgumentException("service URL needs to be 
specified on the ClientBuilder object");
+        }
+
+        return new PulsarClientImpl(serviceUrl, conf);
+    }
+
+    @Override
+    public ClientBuilder clone() {
+        try {
+            return (ClientBuilder) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ClientBuilderImpl");
+        }
+    }
+
+    @Override
+    public ClientBuilder serviceUrl(String serviceUrl) {
+        this.serviceUrl = serviceUrl;
+        return this;
+    }
+
+    @Override
+    public ClientBuilder authentication(Authentication authentication) {
+        conf.setAuthentication(authentication);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder authentication(String authPluginClassName, String 
authParamsString)
+            throws UnsupportedAuthenticationException {
+        conf.setAuthentication(authPluginClassName, authParamsString);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder authentication(String authPluginClassName, 
Map<String, String> authParams)
+            throws UnsupportedAuthenticationException {
+        conf.setAuthentication(authPluginClassName, authParams);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) 
{
+        conf.setOperationTimeout(operationTimeout, unit);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder ioThreads(int numIoThreads) {
+        conf.setIoThreads(numIoThreads);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder listenerThreads(int numListenerThreads) {
+        conf.setListenerThreads(numListenerThreads);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder connectionsPerBroker(int connectionsPerBroker) {
+        conf.setConnectionsPerBroker(connectionsPerBroker);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder enableTcpNoDelay(boolean useTcpNoDelay) {
+        conf.setUseTcpNoDelay(useTcpNoDelay);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder enableTls(boolean useTls) {
+        conf.setUseTls(useTls);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder enableTlsHostnameVerification(boolean 
enableTlsHostnameVerification) {
+        conf.setTlsHostnameVerificationEnable(enableTlsHostnameVerification);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
+        conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder allowTlsInsecureConnection(boolean 
tlsAllowInsecureConnection) {
+        conf.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) {
+        conf.setStatsInterval(statsInterval, unit);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder maxConcurrentLookupRequests(int 
concurrentLookupRequests) {
+        conf.setConcurrentLookupRequest(concurrentLookupRequests);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder maxNumberOfRejectedRequestPerConnection(int 
maxNumberOfRejectedRequestPerConnection) {
+        
conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
+        return this;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
new file mode 100644
index 0000000..ab91326
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.FutureUtil;
+
+
+@SuppressWarnings("deprecation")
+public class ConsumerBuilderImpl implements ConsumerBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PulsarClientImpl client;
+    private String topicName;
+    private String subscriptionName;
+    private final ConsumerConfiguration conf;
+
+    ConsumerBuilderImpl(PulsarClientImpl client) {
+        this.client = client;
+        this.conf = new ConsumerConfiguration();
+    }
+
+    @Override
+    public ConsumerBuilder clone() {
+        try {
+            return (ConsumerBuilder) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ConsumerBuilderImpl");
+        }
+    }
+
+    @Override
+    public Consumer subscribe() throws PulsarClientException {
+        try {
+            return subscribeAsync().get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync() {
+        if (topicName == null) {
+            return FutureUtil
+                    .failedFuture(new IllegalArgumentException("Topic name 
must be set on the producer builder"));
+        }
+
+        if (subscriptionName == null) {
+            return FutureUtil.failedFuture(
+                    new IllegalArgumentException("Subscription name must be 
set on the producer builder"));
+        }
+
+        return client.subscribeAsync(topicName, subscriptionName, conf);
+    }
+
+    @Override
+    public ConsumerBuilder topic(String topicName) {
+        this.topicName = topicName;
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder subscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit) {
+        conf.setAckTimeout(ackTimeout, timeUnit);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder subscriptionType(SubscriptionType subscriptionType) 
{
+        conf.setSubscriptionType(subscriptionType);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder messageListener(MessageListener messageListener) {
+        conf.setMessageListener(messageListener);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder cryptoFailureAction(ConsumerCryptoFailureAction 
action) {
+        conf.setCryptoFailureAction(action);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder receiverQueueSize(int receiverQueueSize) {
+        conf.setReceiverQueueSize(receiverQueueSize);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder consumerName(String consumerName) {
+        conf.setConsumerName(consumerName);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder priorityLevel(int priorityLevel) {
+        conf.setPriorityLevel(priorityLevel);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder property(String key, String value) {
+        conf.setProperty(key, value);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder properties(Map<String, String> properties) {
+        conf.setProperties(properties);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder maxTotalReceiverQueueSizeAcrossPartitions(int 
maxTotalReceiverQueueSizeAcrossPartitions) {
+        
conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder readCompacted(boolean readCompacted) {
+        conf.setReadCompacted(readCompacted);
+        return this;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
new file mode 100644
index 0000000..6bb9a9b
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@SuppressWarnings("deprecation")
+public class ProducerBuilderImpl implements ProducerBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PulsarClientImpl client;
+    private String topicName;
+    private final ProducerConfiguration conf;
+
+    ProducerBuilderImpl(PulsarClientImpl client) {
+        this.client = client;
+        this.conf = new ProducerConfiguration();
+    }
+
+    @Override
+    public ProducerBuilder clone() {
+        try {
+            return (ProducerBuilder) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ProducerBuilderImpl");
+        }
+    }
+
+    @Override
+    public Producer create() throws PulsarClientException {
+        try {
+            return createAsync().get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Producer> createAsync() {
+        if (topicName == null) {
+            return FutureUtil
+                    .failedFuture(new IllegalArgumentException("Topic name 
must be set on the producer builder"));
+        }
+
+        return client.createProducerAsync(topicName, conf);
+    }
+
+    @Override
+    public ProducerBuilder topic(String topicName) {
+        this.topicName = topicName;
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder producerName(String producerName) {
+        conf.setProducerName(producerName);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit) {
+        conf.setSendTimeout(sendTimeout, unit);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder maxPendingMessages(int maxPendingMessages) {
+        conf.setMaxPendingMessages(maxPendingMessages);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder maxPendingMessagesAcrossPartitions(int 
maxPendingMessagesAcrossPartitions) {
+        
conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull) {
+        conf.setBlockIfQueueFull(blockIfQueueFull);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder messageRoutingMode(MessageRoutingMode 
messageRouteMode) {
+        
conf.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.valueOf(messageRouteMode.toString()));
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder compressionType(CompressionType compressionType) {
+        conf.setCompressionType(compressionType);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder messageRouter(MessageRouter messageRouter) {
+        conf.setMessageRouter(messageRouter);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder enableBatching(boolean batchMessagesEnabled) {
+        conf.setBatchingEnabled(batchMessagesEnabled);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder addEncryptionKey(String key) {
+        conf.addEncryptionKey(key);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder cryptoFailureAction(ProducerCryptoFailureAction 
action) {
+        conf.setCryptoFailureAction(action);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit 
timeUnit) {
+        conf.setBatchingMaxPublishDelay(batchDelay, timeUnit);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder batchingMaxMessages(int 
batchMessagesMaxMessagesPerBatch) {
+        conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder initialSequenceId(long initialSequenceId) {
+        conf.setInitialSequenceId(initialSequenceId);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder property(String key, String value) {
+        conf.setProperty(key, value);
+        return this;
+    }
+
+    @Override
+    public ProducerBuilder properties(Map<String, String> properties) {
+        conf.setProperties(properties);
+        return this;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 029f712..6e1c5af 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -32,13 +32,16 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.ReaderConfiguration;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.util.ExecutorProvider;
@@ -58,6 +61,7 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+@SuppressWarnings("deprecation")
 public class PulsarClientImpl implements PulsarClient {
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarClientImpl.class);
@@ -92,8 +96,7 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, 
EventLoopGroup eventLoopGroup,
-            ConnectionPool cnxPool)
-            throws PulsarClientException {
+            ConnectionPool cnxPool) throws PulsarClientException {
         if (isBlank(serviceUrl) || conf == null || eventLoopGroup == null) {
             throw new 
PulsarClientException.InvalidConfigurationException("Invalid client 
configuration");
         }
@@ -118,6 +121,21 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     @Override
+    public ProducerBuilder newProducer() {
+        return new ProducerBuilderImpl(this);
+    }
+
+    @Override
+    public ConsumerBuilder newConsumer() {
+        return new ConsumerBuilderImpl(this);
+    }
+
+    @Override
+    public ReaderBuilder newReader() {
+        return new ReaderBuilderImpl(this);
+    }
+
+    @Override
     public Producer createProducer(String destination) throws 
PulsarClientException {
         try {
             return createProducerAsync(destination, new 
ProducerConfiguration()).get();
@@ -157,6 +175,7 @@ public class PulsarClientImpl implements PulsarClient {
         return createProducerAsync(topic, new ProducerConfiguration());
     }
 
+    @Override
     public CompletableFuture<Producer> createProducerAsync(final String topic, 
final ProducerConfiguration conf) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed"));
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
new file mode 100644
index 0000000..f375134
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@SuppressWarnings("deprecation")
+public class ReaderBuilderImpl implements ReaderBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PulsarClientImpl client;
+
+    private final ReaderConfiguration conf;
+    private String topicName;
+    private MessageId startMessageId;
+
+    ReaderBuilderImpl(PulsarClientImpl client) {
+        this.client = client;
+        this.conf = new ReaderConfiguration();
+    }
+
+    @Override
+    public ReaderBuilder clone() {
+        try {
+            return (ReaderBuilder) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Failed to clone ReaderBuilderImpl");
+        }
+    }
+
+    @Override
+    public Reader create() throws PulsarClientException {
+        try {
+            return createAsync().get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Reader> createAsync() {
+        if (topicName == null) {
+            return FutureUtil
+                    .failedFuture(new IllegalArgumentException("Topic name 
must be set on the reader builder"));
+        }
+
+        if (startMessageId == null) {
+            return FutureUtil
+                    .failedFuture(new IllegalArgumentException("Start message 
id must be set on the reader builder"));
+        }
+
+        return client.createReaderAsync(topicName, startMessageId, conf);
+    }
+
+    @Override
+    public ReaderBuilder topic(String topicName) {
+        this.topicName = topicName;
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder startMessageId(MessageId startMessageId) {
+        this.startMessageId = startMessageId;
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder readerListener(ReaderListener readerListener) {
+        conf.setReaderListener(readerListener);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction 
action) {
+        conf.setCryptoFailureAction(action);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder receiverQueueSize(int receiverQueueSize) {
+        conf.setReceiverQueueSize(receiverQueueSize);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder readerName(String readerName) {
+        conf.setReaderName(readerName);
+        return this;
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java
index 5068a2c..f96d288 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java
@@ -18,31 +18,25 @@
  */
 package org.apache.pulsar.client.api;
 
-import org.testng.annotations.Test;
-
-import com.google.common.base.Objects;
-
 import static org.testng.Assert.assertEquals;
 
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.ConsumerId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.testng.Assert;
+import org.testng.annotations.Test;
 
 public class MessageIdTest {
-    
+
     @Test
     public void messageIdTest() {
         MessageId mId = new MessageIdImpl(1, 2, 3);
         assertEquals(mId.toString(), "1:2:3");
-        
+
         mId = new BatchMessageIdImpl(0, 2, 3, 4);
         assertEquals(mId.toString(), "0:2:3:4");
-        
+
         mId = new BatchMessageIdImpl(-1, 2, -3, 4);
         assertEquals(mId.toString(), "-1:2:-3:4");
-        
+
         mId = new MessageIdImpl(0, -23, 3);
         assertEquals(mId.toString(), "0:-23:3");
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
similarity index 50%
copy from 
pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java
copy to 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
index 5068a2c..f4b89d5 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -16,34 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api;
+package org.apache.pulsar.client.impl;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.pulsar.client.api.PulsarClient;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Objects;
+@SuppressWarnings("deprecation")
+public class BuildersTest {
 
-import static org.testng.Assert.assertEquals;
+    @Test
+    public void clientBuilderTest() {
+        ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder().enableTls(true).ioThreads(10)
+                
.maxNumberOfRejectedRequestPerConnection(200).serviceUrl("pulsar://service:6650");
 
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.ConsumerId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.testng.Assert;
+        assertEquals(clientBuilder.conf.isUseTls(), true);
+        assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
 
-public class MessageIdTest {
-    
-    @Test
-    public void messageIdTest() {
-        MessageId mId = new MessageIdImpl(1, 2, 3);
-        assertEquals(mId.toString(), "1:2:3");
-        
-        mId = new BatchMessageIdImpl(0, 2, 3, 4);
-        assertEquals(mId.toString(), "0:2:3:4");
-        
-        mId = new BatchMessageIdImpl(-1, 2, -3, 4);
-        assertEquals(mId.toString(), "-1:2:-3:4");
-        
-        mId = new MessageIdImpl(0, -23, 3);
-        assertEquals(mId.toString(), "0:-23:3");
+        ClientBuilderImpl b2 = (ClientBuilderImpl) clientBuilder.clone();
+        assertTrue(b2 != clientBuilder);
+
+        b2.serviceUrl("pulsar://other-broker:6650");
+
+        assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
+        assertEquals(b2.serviceUrl, "pulsar://other-broker:6650");
     }
+
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
index 43b7b26..97f25b1 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
@@ -26,14 +26,14 @@ import org.apache.pulsar.client.api.PulsarClientException;
 
 public class SampleProducer {
     public static void main(String[] args) throws PulsarClientException, 
InterruptedException, IOException {
-        PulsarClient pulsarClient = 
PulsarClient.create("http://127.0.0.1:8080";);
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("http://localhost:6650";).build();
 
-        Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic");
+        Producer producer = 
client.newProducer().topic("persistent://my-property/use/my-ns/my-topic").create();
 
         for (int i = 0; i < 10; i++) {
             producer.send("my-message".getBytes());
         }
 
-        pulsarClient.close();
+        client.close();
     }
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to