This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e336b4  Add Client auth plugin and tls support for function to 
connect with broker (#1935)
6e336b4 is described below

commit 6e336b4080f02370c667e1a93d5d7c65b982d562
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Fri Jun 8 13:00:12 2018 -0700

    Add Client auth plugin and tls support for function to connect with broker 
(#1935)
    
    * Add Client auth plugin and tls support for function to connect with broker
    
    * add authConfig builder
    
    * add hostnameverification and tlsCertPath
    
    * add broker-tls url on worker
    
    * take string type for boolean data-type
---
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    | 89 ++++++++++++++++++----
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 36 +++++++--
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 29 ++++++-
 .../org/apache/pulsar/admin/cli/CmdSources.java    | 29 ++++++-
 .../client/impl/conf/ClientConfigurationData.java  |  2 +-
 .../functions/instance/AuthenticationConfig.java   | 44 +++++++++++
 .../src/main/python/python_instance_main.py        | 21 ++++-
 .../pulsar/functions/runtime/JavaInstanceMain.java | 46 ++++++++---
 .../pulsar/functions/runtime/ProcessRuntime.java   | 31 +++++++-
 .../functions/runtime/ProcessRuntimeFactory.java   |  8 +-
 .../functions/runtime/ThreadRuntimeFactory.java    | 41 +++++++---
 .../functions/runtime/ProcessRuntimeTest.java      |  2 +-
 .../functions/worker/FunctionRuntimeManager.java   | 12 ++-
 .../pulsar/functions/worker/MembershipManager.java |  4 +-
 .../org/apache/pulsar/functions/worker/Utils.java  | 17 ++++-
 .../org/apache/pulsar/functions/worker/Worker.java |  4 +-
 .../pulsar/functions/worker/WorkerConfig.java      |  8 +-
 .../pulsar/functions/worker/WorkerService.java     | 15 +++-
 18 files changed, 375 insertions(+), 63 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5089e95..328d6b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.Mockito.spy;
 
@@ -27,6 +28,8 @@ import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -40,14 +43,17 @@ import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -87,7 +93,7 @@ public class PulsarSinkE2ETest {
 
     ServiceConfiguration config;
     WorkerConfig workerConfig;
-    URL url;
+    URL urlTls;
     PulsarService pulsar;
     PulsarAdmin admin;
     PulsarClient pulsarClient;
@@ -102,8 +108,16 @@ public class PulsarSinkE2ETest {
 
     private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
     private final int brokerWebServicePort = PortManager.nextFreePort();
+    private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
     private final int brokerServicePort = PortManager.nextFreePort();
+    private final int brokerServiceTlsPort = PortManager.nextFreePort();
     private final int workerServicePort = PortManager.nextFreePort();
+
+    private final String TLS_SERVER_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-key.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/client-key.pem";
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarSinkE2ETest.class);
 
     @BeforeMethod
@@ -118,31 +132,67 @@ public class PulsarSinkE2ETest {
         bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
         bkEnsemble.start();
 
-        String hostHttpUrl = "http://127.0.0.1"; + ":";
+        String brokerServiceUrl = "https://127.0.0.1:"; + 
brokerWebServiceTlsPort;
 
         config = spy(new ServiceConfiguration());
         config.setClusterName("use");
+        Set<String> superUsers = Sets.newHashSet("superUser");
+        config.setSuperUserRoles(superUsers);
         config.setWebServicePort(brokerWebServicePort);
+        config.setWebServicePortTls(brokerWebServiceTlsPort);
         config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
         config.setBrokerServicePort(brokerServicePort);
+        config.setBrokerServicePortTls(brokerServiceTlsPort);
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        config.setAuthenticationEnabled(true);
+        config.setAuthenticationProviders(providers);
+        config.setTlsEnabled(true);
+        config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config.setTlsAllowInsecureConnection(true);
+    
+
         functionsWorkerService = createPulsarFunctionWorker(config);
-        url = new URL(hostHttpUrl + brokerWebServicePort);
+        urlTls = new URL(brokerServiceUrl);
         boolean isFunctionWebServerRequired = method.getName()
                 .equals("testExternalReplicatorRedirectionToWorkerService");
         Optional<WorkerService> functionWorkerService = 
isFunctionWebServerRequired ? Optional.ofNullable(null)
                 : Optional.of(functionsWorkerService);
         pulsar = new PulsarService(config, functionWorkerService);
         pulsar.start();
-        admin = new PulsarAdmin(url, (Authentication) null);
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+
+        admin = spy(
+                
PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
+                        
.allowTlsInsecureConnection(true).authentication(authTls).build());
+
         brokerStatsClient = admin.brokerStats();
         primaryHost = String.format("http://%s:%d";, 
InetAddress.getLocalHost().getHostName(), brokerWebServicePort);
 
         // update cluster metadata
-        ClusterData clusterData = new ClusterData(url.toString());
+        ClusterData clusterData = new ClusterData(urlTls.toString());
         admin.clusters().updateCluster(config.getClusterName(), clusterData);
 
-        pulsarClient = 
PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();
+        ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+        if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+                && 
isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+            clientBuilder.enableTls(workerConfig.isUseTls());
+            
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
+            
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+                    workerConfig.getClientAuthenticationParameters());
+        }
+        pulsarClient = clientBuilder.build();
+        // pulsarClient = 
PulsarClient.builder().serviceUrl(urlTls.toString()).statsInterval(0,
+        // TimeUnit.SECONDS).build();
 
         TenantInfo propAdmin = new TenantInfo();
         
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
@@ -166,6 +216,7 @@ public class PulsarSinkE2ETest {
         if (workerExecutor != null) {
             workerExecutor.shutdown();
         }
+        pulsarClient.close();
         admin.close();
         pulsar.close();
         functionsWorkerService.stop();
@@ -179,8 +230,8 @@ public class PulsarSinkE2ETest {
                 
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
         workerConfig.setThreadContainerFactory(new 
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
         // worker talks to local broker
-        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + 
config.getBrokerServicePort());
-        workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:"; + 
config.getWebServicePort());
+        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + 
config.getBrokerServicePortTls());
+        workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:"; + 
config.getWebServicePortTls());
         workerConfig.setFailureCheckFreqMs(100);
         workerConfig.setNumFunctionPackageReplicas(1);
         workerConfig.setClusterCoordinationTopicName("coordinate");
@@ -193,11 +244,19 @@ public class PulsarSinkE2ETest {
         workerConfig.setWorkerHostname(hostname);
         workerConfig
                 .setWorkerId("c-" + config.getClusterName() + "-fw-" + 
hostname + "-" + workerConfig.getWorkerPort());
+
+        
workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        workerConfig.setClientAuthenticationParameters(
+                String.format("tlsCertFile:%s,tlsKeyFile:%s", 
TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
+        workerConfig.setUseTls(true);
+        workerConfig.setTlsAllowInsecureConnection(true);
+        workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+
         return new WorkerService(workerConfig);
     }
 
     /**
-     * Validates pulsar sink e2e functionality on functions.  
+     * Validates pulsar sink e2e functionality on functions.
      * 
      * @throws Exception
      */
@@ -236,8 +295,8 @@ public class PulsarSinkE2ETest {
         }
         retryStrategically((test) -> {
             try {
-                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.values()
-                        .iterator().next();
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
+                        .next();
                 return subStats.unackedMessages == 0;
             } catch (PulsarAdminException e) {
                 return false;
@@ -245,8 +304,8 @@ public class PulsarSinkE2ETest {
         }, 5, 150);
         // validate pulsar-sink consumer has consumed all messages and 
delivered to Pulsar sink but unacked messages
         // due to publish failure
-        
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
-                .next().unackedMessages, 0);
+        Assert.assertEquals(
+                
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
 0);
 
     }
 
@@ -281,7 +340,7 @@ public class PulsarSinkE2ETest {
 
         // set up sink spec
         SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-        //sinkSpecBuilder.setClassName(PulsarSink.class.getName());
+        // sinkSpecBuilder.setClassName(PulsarSink.class.getName());
         sinkSpecBuilder.setTopic(String.format("persistent://%s/%s/%s", 
tenant, namespace, "output"));
         Map<String, Object> sinkConfigMap = Maps.newHashMap();
         sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
@@ -290,4 +349,4 @@ public class PulsarSinkE2ETest {
 
         return functionDetailsBuilder.build();
     }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 6915774..4b17ff5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -51,6 +51,8 @@ import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
@@ -642,11 +644,34 @@ public class CmdFunctions extends CmdBase {
 
         @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker")
         protected String brokerServiceUrl;
+        
+        @Parameter(names = "--clientAuthPlugin", description = "Client 
authentication plugin using which function-process can connect to broker")
+        protected String clientAuthPlugin;
+        
+        @Parameter(names = "--clientAuthParams", description = "Client 
authentication param")
+        protected String clientAuthParams;
+        
+        @Parameter(names = "--use_tls", description = "Use tls connection\n")
+        protected boolean useTls;
+
+        @Parameter(names = "--tls_allow_insecure", description = "Allow 
insecure tls connection\n")
+        protected boolean tlsAllowInsecureConnection;
+        
+        @Parameter(names = "--hostname_verification_enabled", description = 
"Enable hostname verification")
+        protected boolean tlsHostNameVerificationEnabled;
+        
+        @Parameter(names = "--tls_trust_cert_path", description = "tls trust 
cert file path")
+        protected String tlsTrustCertFilePath;
 
         @Override
         void runCmd() throws Exception {
-            CmdFunctions.startLocalRun(convertProto2(functionConfig),
-                    functionConfig.getParallelism(), brokerServiceUrl, 
userCodeFile, admin);
+            CmdFunctions.startLocalRun(convertProto2(functionConfig), 
functionConfig.getParallelism(), brokerServiceUrl,
+                    
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
+                            
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
+                            
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
+                            
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
+                            
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+                    userCodeFile, admin);
         }
     }
 
@@ -911,7 +936,8 @@ public class CmdFunctions extends CmdBase {
     }
 
     protected static void 
startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails 
functionDetails,
-                                        int parallelism, String 
brokerServiceUrl, String userCodeFile, PulsarAdmin admin)
+            int parallelism, String brokerServiceUrl, AuthenticationConfig 
authConfig,
+            String userCodeFile, PulsarAdmin admin)
             throws Exception {
 
         String serviceUrl = admin.getServiceUrl();
@@ -921,8 +947,8 @@ public class CmdFunctions extends CmdBase {
         if (serviceUrl == null) {
             serviceUrl = DEFAULT_SERVICE_URL;
         }
-        try (ProcessRuntimeFactory containerFactory = new 
ProcessRuntimeFactory(
-                serviceUrl, null, null, null)) {
+        try (ProcessRuntimeFactory containerFactory = new 
ProcessRuntimeFactory(serviceUrl, authConfig, null, null,
+                null)) {
             List<RuntimeSpawner> spawners = new LinkedList<>();
             for (int i = 0; i < parallelism; ++i) {
                 InstanceConfig instanceConfig = new InstanceConfig();
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 6f67ee7..59b9201 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
@@ -99,11 +100,35 @@ public class CmdSinks extends CmdBase {
 
         @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker")
         protected String brokerServiceUrl;
+        
+        @Parameter(names = "--clientAuthPlugin", description = "Client 
authentication plugin using which function-process can connect to broker")
+        protected String clientAuthPlugin;
+        
+        @Parameter(names = "--clientAuthParams", description = "Client 
authentication param")
+        protected String clientAuthParams;
+        
+        @Parameter(names = "--use_tls", description = "Use tls connection\n")
+        protected boolean useTls;
+
+        @Parameter(names = "--tls_allow_insecure", description = "Allow 
insecure tls connection\n")
+        protected boolean tlsAllowInsecureConnection;
+        
+        @Parameter(names = "--hostname_verification_enabled", description = 
"Enable hostname verification")
+        protected boolean tlsHostNameVerificationEnabled;
+        
+        @Parameter(names = "--tls_trust_cert_path", description = "tls trust 
cert file path")
+        protected String tlsTrustCertFilePath;
 
         @Override
         void runCmd() throws Exception {
-            CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig),
-                    sinkConfig.getParallelism(), brokerServiceUrl, jarFile, 
admin);
+            CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), 
sinkConfig.getParallelism(),
+                    brokerServiceUrl,
+                    
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
+                            
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
+                            
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
+                            
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
+                            
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+                    jarFile, admin);
         }
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 8348382..a3bec4a 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
@@ -95,11 +96,35 @@ public class CmdSources extends CmdBase {
 
         @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker")
         protected String brokerServiceUrl;
+        
+        @Parameter(names = "--clientAuthPlugin", description = "Client 
authentication plugin using which function-process can connect to broker")
+        protected String clientAuthPlugin;
+        
+        @Parameter(names = "--clientAuthParams", description = "Client 
authentication param")
+        protected String clientAuthParams;
+        
+        @Parameter(names = "--use_tls", description = "Use tls connection\n")
+        protected boolean useTls;
+
+        @Parameter(names = "--tls_allow_insecure", description = "Allow 
insecure tls connection\n")
+        protected boolean tlsAllowInsecureConnection;
+        
+        @Parameter(names = "--hostname_verification_enabled", description = 
"Enable hostname verification")
+        protected boolean tlsHostNameVerificationEnabled;
+        
+        @Parameter(names = "--tls_trust_cert_path", description = "tls trust 
cert file path")
+        protected String tlsTrustCertFilePath;
 
         @Override
         void runCmd() throws Exception {
-            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
-                    sourceConfig.getParallelism(), brokerServiceUrl, jarFile, 
admin);
+            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), 
sourceConfig.getParallelism(),
+                    brokerServiceUrl,
+                    
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
+                            
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
+                            
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
+                            
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
+                            
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+                    jarFile, admin);
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 1d3280b..cedc892 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -55,7 +55,7 @@ public class ClientConfigurationData implements Serializable, 
Cloneable {
     private int maxLookupRequest = 50000;
     private int maxNumberOfRejectedRequestPerConnection = 50;
     private int keepAliveIntervalSeconds = 30;
-
+    
     public ClientConfigurationData clone() {
         try {
             return (ClientConfigurationData) super.clone();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
new file mode 100644
index 0000000..f4c78ea
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * Configuration to aggregate various authentication params.
+ */
+@Data
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+@Builder
+public class AuthenticationConfig {
+    private String clientAuthenticationPlugin;
+    private String clientAuthenticationParameters;
+    private String tlsTrustCertsFilePath;
+    private boolean useTls;
+    private boolean tlsAllowInsecureConnection;
+    private boolean tlsHostnameVerificationEnable;
+}
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index b3eaa1c..a3c1179 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -30,6 +30,7 @@ import signal
 import time
 import json
 
+from pulsar import Authentication
 import pulsar
 
 import Function_pb2
@@ -63,6 +64,12 @@ def main():
   parser.add_argument('--function_version', required=True, help='Function 
Version')
   parser.add_argument('--processing_guarantees', required=True, 
help='Processing Guarantees')
   parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar 
Service Url')
+  parser.add_argument('--client_auth_plugin', required=False, help='Client 
authentication plugin')
+  parser.add_argument('--client_auth_params', required=False, help='Client 
authentication params')
+  parser.add_argument('--use_tls', required=False, help='Use tls')
+  parser.add_argument('--tls_allow_insecure_connection', required=False, 
help='Tls allow insecure connection')
+  parser.add_argument('--hostname_verification_enabled', required=False, 
help='Enable hostname verification')
+  parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust 
cert file path')
   parser.add_argument('--port', required=True, help='Instance Port', type=int)
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum 
number of Buffered tuples')
   parser.add_argument('--user_config', required=False, help='User Config')
@@ -124,7 +131,19 @@ def main():
   if args.user_config != None and len(args.user_config) != 0:
     function_details.userConfig = args.user_config
 
-  pulsar_client = pulsar.Client(args.pulsar_serviceurl)
+  authentication = None
+  use_tls = False
+  tls_allow_insecure_connection = False
+  tls_trust_cert_path = None
+  if args.client_auth_plugin and args.client_auth_params:
+      authentication = pulsar.Authentication(args.client_auth_plugin, 
args.client_auth_params)
+  if args.use_tls == "true":
+    use_tls = True
+  if args.tls_allow_insecure_connection == "true":
+    tls_allow_insecure_connection = True
+  if args.tls_trust_cert_path:
+     tls_trust_cert_path =  args.tls_trust_cert_path
+  pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 
1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
   pyinstance = python_instance.PythonInstance(str(args.instance_id), 
str(args.function_id),
                                               str(args.function_version), 
function_details,
                                               int(args.max_buffered_tuples), 
str(args.py),
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index b602a1d..554f9ac 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -30,6 +30,8 @@ import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
@@ -39,7 +41,6 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 
-import java.lang.reflect.Type;
 import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
@@ -82,7 +83,25 @@ public class JavaInstanceMain implements AutoCloseable {
 
     @Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service 
Url\n", required = true)
     protected String pulsarServiceUrl;
-
+    
+    @Parameter(names = "--client_auth_plugin", description = "Client auth 
plugin name\n")
+    protected String clientAuthenticationPlugin;
+    
+    @Parameter(names = "--client_auth_params", description = "Client auth 
param\n")
+    protected String clientAuthenticationParameters;
+    
+    @Parameter(names = "--use_tls", description = "Use tls connection\n")
+    protected String useTls = Boolean.FALSE.toString();
+    
+    @Parameter(names = "--tls_allow_insecure", description = "Allow insecure 
tls connection\n")
+    protected String tlsAllowInsecureConnection = Boolean.TRUE.toString();
+    
+    @Parameter(names = "--hostname_verification_enabled", description = 
"Enable hostname verification")
+    protected String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();
+    
+    @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert 
file path")
+    protected String tlsTrustCertFilePath;
+    
     @Parameter(names = "--state_storage_serviceurl", description = "State 
Storage Service Url\n", required= false)
     protected String stateStorageServiceUrl;
 
@@ -96,7 +115,7 @@ public class JavaInstanceMain implements AutoCloseable {
     protected String userConfig;
 
     @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n")
-    protected String autoAck = "true";
+    protected String autoAck = Boolean.TRUE.toString();
 
     @Parameter(names = "--source_classname", description = "The source 
classname")
     protected String sourceClassname;
@@ -158,11 +177,7 @@ public class JavaInstanceMain implements AutoCloseable {
             functionDetailsBuilder.setLogTopic(logTopic);
         }
         functionDetailsBuilder.setProcessingGuarantees(processingGuarantees);
-        if (autoAck.equals("true")) {
-            functionDetailsBuilder.setAutoAck(true);
-        } else {
-            functionDetailsBuilder.setAutoAck(false);
-        }
+        functionDetailsBuilder.setAutoAck(isTrue(autoAck));
         if (userConfig != null && !userConfig.isEmpty()) {
             functionDetailsBuilder.setUserConfig(userConfig);
         }
@@ -207,10 +222,13 @@ public class JavaInstanceMain implements AutoCloseable {
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
 
-        ThreadRuntimeFactory containerFactory = new ThreadRuntimeFactory(
-                "LocalRunnerThreadGroup",
-                pulsarServiceUrl,
-                stateStorageServiceUrl);
+        ThreadRuntimeFactory containerFactory = new 
ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
+                stateStorageServiceUrl,
+                
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
+                        
.clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls))
+                        
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
+                        
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
+                        .tlsTrustCertsFilePath(tlsTrustCertFilePath).build());
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
@@ -257,6 +275,10 @@ public class JavaInstanceMain implements AutoCloseable {
         close();
     }
 
+    private static boolean isTrue(String param) {
+        return Boolean.TRUE.toString().equals(param);
+    }
+
     public static void main(String[] args) throws Exception {
         JavaInstanceMain javaInstanceMain = new JavaInstanceMain();
         JCommander jcommander = new JCommander(javaInstanceMain);
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 715eab6..e94e39d 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -30,6 +30,9 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -66,17 +69,20 @@ class ProcessRuntime implements Runtime {
                    String instanceFile,
                    String logDirectory,
                    String codeFile,
-                   String pulsarServiceUrl) {
+                   String pulsarServiceUrl,
+                   AuthenticationConfig authConfig) {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
-        this.processArgs = composeArgs(instanceConfig, instanceFile, 
logDirectory, codeFile, pulsarServiceUrl);
+        this.processArgs = composeArgs(instanceConfig, instanceFile, 
logDirectory, codeFile, pulsarServiceUrl,
+                authConfig);
     }
 
     private List<String> composeArgs(InstanceConfig instanceConfig,
                                      String instanceFile,
                                      String logDirectory,
                                      String codeFile,
-                                     String pulsarServiceUrl) {
+                                     String pulsarServiceUrl,
+                                     AuthenticationConfig authConfig) {
         List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -135,6 +141,25 @@ class ProcessRuntime implements Runtime {
         
args.add(String.valueOf(instanceConfig.getFunctionDetails().getProcessingGuarantees()));
         args.add("--pulsar_serviceurl");
         args.add(pulsarServiceUrl);
+        if (authConfig != null) {
+            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+                    && 
isNotBlank(authConfig.getClientAuthenticationParameters())) {
+                args.add("--client_auth_plugin");
+                args.add(authConfig.getClientAuthenticationPlugin());
+                args.add("--client_auth_params");
+                args.add(authConfig.getClientAuthenticationParameters());
+            }
+            args.add("--use_tls");
+            args.add(Boolean.toString(authConfig.isUseTls()));
+            args.add("--tls_allow_insecure");
+            
args.add(Boolean.toString(authConfig.isTlsAllowInsecureConnection()));
+            args.add("--hostname_verification_enabled");
+            
args.add(Boolean.toString(authConfig.isTlsHostnameVerificationEnable()));
+            if(isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
+                args.add("--tls_trust_cert_path");
+                args.add(authConfig.getTlsTrustCertsFilePath());
+            }
+        }
         args.add("--max_buffered_tuples");
         args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
         String userConfig = 
instanceConfig.getFunctionDetails().getUserConfig();
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 2223cb7..8fc5b90 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.functions.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 
 import java.nio.file.Paths;
@@ -32,17 +34,20 @@ import java.nio.file.Paths;
 public class ProcessRuntimeFactory implements RuntimeFactory {
 
     private String pulsarServiceUrl;
+    private AuthenticationConfig authConfig;
     private String javaInstanceJarFile;
     private String pythonInstanceFile;
     private String logDirectory;
 
     @VisibleForTesting
     public ProcessRuntimeFactory(String pulsarServiceUrl,
+                                 AuthenticationConfig authConfig,
                                  String javaInstanceJarFile,
                                  String pythonInstanceFile,
                                  String logDirectory) {
 
         this.pulsarServiceUrl = pulsarServiceUrl;
+        this.authConfig = authConfig;
         this.javaInstanceJarFile = javaInstanceJarFile;
         this.pythonInstanceFile = pythonInstanceFile;
         this.logDirectory = logDirectory;
@@ -100,7 +105,8 @@ public class ProcessRuntimeFactory implements 
RuntimeFactory {
             instanceFile,
             logDirectory,
             codeFile,
-            pulsarServiceUrl);
+            pulsarServiceUrl,
+            authConfig);
     }
 
     @Override
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index df89eed..f9c9cff 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -23,8 +23,13 @@ import com.google.common.annotations.VisibleForTesting;
 
 import lombok.extern.slf4j.Slf4j;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import 
org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
@@ -41,26 +46,40 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
     private final String storageServiceUrl;
     private volatile boolean closed;
 
-    public ThreadRuntimeFactory(String threadGroupName,
-                                String pulsarServiceUrl,
-                                String storageServiceUrl)
-            throws Exception {
-        this(
-            threadGroupName,
-            pulsarServiceUrl != null ? 
PulsarClient.builder().serviceUrl(pulsarServiceUrl).build() : null,
-            storageServiceUrl);
+    public ThreadRuntimeFactory(String threadGroupName, String 
pulsarServiceUrl, String storageServiceUrl,
+            AuthenticationConfig authConfig) throws Exception {
+        this(threadGroupName, createPulsarClient(pulsarServiceUrl, 
authConfig), storageServiceUrl);
     }
 
     @VisibleForTesting
-    ThreadRuntimeFactory(String threadGroupName,
-                         PulsarClient pulsarClient,
-                         String storageServiceUrl) {
+    ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, 
String storageServiceUrl) {
         this.fnCache = new FunctionCacheManagerImpl();
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarClient = pulsarClient;
         this.storageServiceUrl = storageServiceUrl;
     }
 
+    private static PulsarClient createPulsarClient(String pulsarServiceUrl, 
AuthenticationConfig authConfig)
+            throws PulsarClientException {
+        ClientBuilder clientBuilder = null;
+        if (isNotBlank(pulsarServiceUrl)) {
+            clientBuilder = 
PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+            if (authConfig != null) {
+                if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+                        && 
isNotBlank(authConfig.getClientAuthenticationParameters())) {
+                    
clientBuilder.authentication(authConfig.getClientAuthenticationPlugin(),
+                            authConfig.getClientAuthenticationParameters());
+                }
+                clientBuilder.enableTls(authConfig.isUseTls());
+                
clientBuilder.allowTlsInsecureConnection(authConfig.isTlsAllowInsecureConnection());
+                
clientBuilder.enableTlsHostnameVerification(authConfig.isTlsHostnameVerificationEnable());
+                
clientBuilder.tlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath());
+            }
+            return clientBuilder.build();
+        }
+        return null;
+    }
+    
     @Override
     public ThreadRuntime createContainer(InstanceConfig instanceConfig, String 
jarFile) {
         return new ThreadRuntime(
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 65e8d89..675f3de 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -66,7 +66,7 @@ public class ProcessRuntimeTest {
         this.pulsarServiceUrl = "pulsar://localhost:6670";
         this.logDirectory = "Users/user/logs";
         this.factory = new ProcessRuntimeFactory(
-            pulsarServiceUrl, javaInstanceJarFile, pythonInstanceFile, 
logDirectory);
+            pulsarServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, 
logDirectory);
     }
 
     @AfterMethod
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index af23992..ba1a33d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
@@ -92,14 +93,23 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
         this.functionAssignmentTailer = new FunctionAssignmentTailer(this, 
reader);
 
+        AuthenticationConfig authConfig = AuthenticationConfig.builder()
+                
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
+                
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
+                .tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath())
+                
.useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection())
+                
.tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build();
+        
         if (workerConfig.getThreadContainerFactory() != null) {
             this.runtimeFactory = new ThreadRuntimeFactory(
                     
workerConfig.getThreadContainerFactory().getThreadGroupName(),
                     workerConfig.getPulsarServiceUrl(),
-                    workerConfig.getStateStorageServiceUrl());
+                    workerConfig.getStateStorageServiceUrl(),
+                    authConfig);
         } else if (workerConfig.getProcessContainerFactory() != null) {
             this.runtimeFactory = new ProcessRuntimeFactory(
                     workerConfig.getPulsarServiceUrl(),
+                    authConfig,
                     
workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(),
                     
workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
                     
workerConfig.getProcessContainerFactory().getLogDirectory());
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index dde017f..de47a33 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -280,7 +280,9 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
 
     private PulsarAdmin getPulsarAdminClient() {
         if (this.pulsarAdminClient == null) {
-            this.pulsarAdminClient = 
Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl());
+            this.pulsarAdminClient = 
Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl(),
+                    workerConfig.getClientAuthenticationPlugin(), 
workerConfig.getClientAuthenticationParameters(),
+                    workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection());
         }
         return this.pulsarAdminClient;
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 51de807..576257b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -35,7 +35,7 @@ import java.nio.channels.ReadableByteChannel;
 import java.util.UUID;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.distributedlog.AppendOnlyStreamWriter;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.DistributedLogManager;
@@ -44,6 +44,7 @@ import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
@@ -212,9 +213,17 @@ public final class Utils {
         return dlogUri;
     }
 
-    public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) 
{
+    public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, 
String authPlugin, String authParams, String tlsTrustCertsFilePath, boolean 
allowTlsInsecureConnection) {
         try {
-            return 
PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl).build();
+            PulsarAdminBuilder adminBuilder = 
PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
+            if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
+                adminBuilder.authentication(authPlugin, authParams);
+            }
+            if (isNotBlank(tlsTrustCertsFilePath)) {
+                adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
+            }
+            
adminBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
+            return adminBuilder.build();
         } catch (PulsarClientException e) {
             log.error("Error creating pulsar admin client", e);
             throw new RuntimeException(e);
@@ -235,7 +244,7 @@ public final class Utils {
     }
     
     public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) 
{
-        return StringUtils.isNotBlank(functionPkgUrl)
+        return isNotBlank(functionPkgUrl)
                 && (functionPkgUrl.startsWith(Utils.HTTP) || 
functionPkgUrl.startsWith(Utils.FILE));
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 4f700ab..6e87b94 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -71,7 +71,9 @@ public class Worker extends AbstractService {
     private static URI initialize(WorkerConfig workerConfig)
             throws InterruptedException, PulsarAdminException, IOException {
         // initializing pulsar functions namespace
-        PulsarAdmin admin = 
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl());
+        PulsarAdmin admin = 
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+                workerConfig.getClientAuthenticationPlugin(), 
workerConfig.getClientAuthenticationParameters(),
+                workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection());
         InternalConfigurationData internalConf;
         // make sure pulsar broker is up
         log.info("Checking if pulsar service at {} is up...", 
workerConfig.getPulsarWebServiceUrl());
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 535a7dd..efbd0fd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -63,7 +63,13 @@ public class WorkerConfig implements Serializable {
     private int initialBrokerReconnectMaxRetries;
     private int assignmentWriteMaxRetries;
     private long instanceLivenessCheckFreqMs;
-
+    private String clientAuthenticationPlugin;
+    private String clientAuthenticationParameters;
+    private boolean useTls = false;
+    private String tlsTrustCertsFilePath = "";
+    private boolean tlsAllowInsecureConnection = false;
+    private boolean tlsHostnameVerificationEnable = false;
+    
     @Data
     @Setter
     @Getter
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index dfe5834..4740d69 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -23,9 +23,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.net.URI;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 
@@ -77,7 +80,17 @@ public class WorkerService {
         // initialize the function metadata manager
         try {
 
-            this.client = 
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()).build();
+            ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+            if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+                    && 
isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+                
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+                        workerConfig.getClientAuthenticationParameters());
+            }
+            clientBuilder.enableTls(workerConfig.isUseTls());
+            
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
+            
clientBuilder.tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath());
+            
clientBuilder.enableTlsHostnameVerification(workerConfig.isTlsHostnameVerificationEnable());
+            this.client = clientBuilder.build();
             log.info("Created Pulsar client");
 
             //create scheduler manager

-- 
To stop receiving notification emails like this one, please contact
rdhaba...@apache.org.

Reply via email to