This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6e336b4 Add Client auth plugin and tls support for function to connect with broker (#1935) 6e336b4 is described below commit 6e336b4080f02370c667e1a93d5d7c65b982d562 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Fri Jun 8 13:00:12 2018 -0700 Add Client auth plugin and tls support for function to connect with broker (#1935) * Add Client auth plugin and tls support for function to connect with broker * add authConfig builder * add hostnameverification and tlsCertPath * add broker-tls url on worker * take string type for boolean data-type --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java | 89 ++++++++++++++++++---- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 36 +++++++-- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 29 ++++++- .../org/apache/pulsar/admin/cli/CmdSources.java | 29 ++++++- .../client/impl/conf/ClientConfigurationData.java | 2 +- .../functions/instance/AuthenticationConfig.java | 44 +++++++++++ .../src/main/python/python_instance_main.py | 21 ++++- .../pulsar/functions/runtime/JavaInstanceMain.java | 46 ++++++++--- .../pulsar/functions/runtime/ProcessRuntime.java | 31 +++++++- .../functions/runtime/ProcessRuntimeFactory.java | 8 +- .../functions/runtime/ThreadRuntimeFactory.java | 41 +++++++--- .../functions/runtime/ProcessRuntimeTest.java | 2 +- .../functions/worker/FunctionRuntimeManager.java | 12 ++- .../pulsar/functions/worker/MembershipManager.java | 4 +- .../org/apache/pulsar/functions/worker/Utils.java | 17 ++++- .../org/apache/pulsar/functions/worker/Worker.java | 4 +- .../pulsar/functions/worker/WorkerConfig.java | 8 +- .../pulsar/functions/worker/WorkerService.java | 15 +++- 18 files changed, 375 insertions(+), 63 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 5089e95..328d6b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.Mockito.spy; @@ -27,6 +28,8 @@ import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -40,14 +43,17 @@ import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.client.admin.BrokerStats; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -87,7 +93,7 @@ public class PulsarSinkE2ETest { ServiceConfiguration config; WorkerConfig workerConfig; - URL url; + URL urlTls; PulsarService pulsar; PulsarAdmin admin; PulsarClient pulsarClient; @@ -102,8 +108,16 @@ public class PulsarSinkE2ETest { private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); private final int brokerWebServicePort = PortManager.nextFreePort(); + private final int brokerWebServiceTlsPort = PortManager.nextFreePort(); private final int brokerServicePort = PortManager.nextFreePort(); + private final int brokerServiceTlsPort = PortManager.nextFreePort(); private final int workerServicePort = PortManager.nextFreePort(); + + private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem"; + private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem"; + private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem"; + private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem"; + private static final Logger log = LoggerFactory.getLogger(PulsarSinkE2ETest.class); @BeforeMethod @@ -118,31 +132,67 @@ public class PulsarSinkE2ETest { bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); bkEnsemble.start(); - String hostHttpUrl = "http://127.0.0.1" + ":"; + String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort; config = spy(new ServiceConfiguration()); config.setClusterName("use"); + Set<String> superUsers = Sets.newHashSet("superUser"); + config.setSuperUserRoles(superUsers); config.setWebServicePort(brokerWebServicePort); + config.setWebServicePortTls(brokerWebServiceTlsPort); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); config.setBrokerServicePort(brokerServicePort); + config.setBrokerServicePortTls(brokerServiceTlsPort); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); + + + Set<String> providers = new HashSet<>(); + providers.add(AuthenticationProviderTls.class.getName()); + config.setAuthenticationEnabled(true); + config.setAuthenticationProviders(providers); + config.setTlsEnabled(true); + config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + config.setTlsAllowInsecureConnection(true); + + functionsWorkerService = createPulsarFunctionWorker(config); - url = new URL(hostHttpUrl + brokerWebServicePort); + urlTls = new URL(brokerServiceUrl); boolean isFunctionWebServerRequired = method.getName() .equals("testExternalReplicatorRedirectionToWorkerService"); Optional<WorkerService> functionWorkerService = isFunctionWebServerRequired ? Optional.ofNullable(null) : Optional.of(functionsWorkerService); pulsar = new PulsarService(config, functionWorkerService); pulsar.start(); - admin = new PulsarAdmin(url, (Authentication) null); + + Map<String, String> authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + Authentication authTls = new AuthenticationTls(); + authTls.configure(authParams); + + admin = spy( + PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH) + .allowTlsInsecureConnection(true).authentication(authTls).build()); + brokerStatsClient = admin.brokerStats(); primaryHost = String.format("http://%s:%d", InetAddress.getLocalHost().getHostName(), brokerWebServicePort); // update cluster metadata - ClusterData clusterData = new ClusterData(url.toString()); + ClusterData clusterData = new ClusterData(urlTls.toString()); admin.clusters().updateCluster(config.getClusterName(), clusterData); - pulsarClient = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build(); + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()); + if (isNotBlank(workerConfig.getClientAuthenticationPlugin()) + && isNotBlank(workerConfig.getClientAuthenticationParameters())) { + clientBuilder.enableTls(workerConfig.isUseTls()); + clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection()); + clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(), + workerConfig.getClientAuthenticationParameters()); + } + pulsarClient = clientBuilder.build(); + // pulsarClient = PulsarClient.builder().serviceUrl(urlTls.toString()).statsInterval(0, + // TimeUnit.SECONDS).build(); TenantInfo propAdmin = new TenantInfo(); propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); @@ -166,6 +216,7 @@ public class PulsarSinkE2ETest { if (workerExecutor != null) { workerExecutor.shutdown(); } + pulsarClient.close(); admin.close(); pulsar.close(); functionsWorkerService.stop(); @@ -179,8 +230,8 @@ public class PulsarSinkE2ETest { org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); // worker talks to local broker - workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort()); - workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort()); + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls()); + workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls()); workerConfig.setFailureCheckFreqMs(100); workerConfig.setNumFunctionPackageReplicas(1); workerConfig.setClusterCoordinationTopicName("coordinate"); @@ -193,11 +244,19 @@ public class PulsarSinkE2ETest { workerConfig.setWorkerHostname(hostname); workerConfig .setWorkerId("c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort()); + + workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName()); + workerConfig.setClientAuthenticationParameters( + String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH)); + workerConfig.setUseTls(true); + workerConfig.setTlsAllowInsecureConnection(true); + workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH); + return new WorkerService(workerConfig); } /** - * Validates pulsar sink e2e functionality on functions. + * Validates pulsar sink e2e functionality on functions. * * @throws Exception */ @@ -236,8 +295,8 @@ public class PulsarSinkE2ETest { } retryStrategically((test) -> { try { - SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values() - .iterator().next(); + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator() + .next(); return subStats.unackedMessages == 0; } catch (PulsarAdminException e) { return false; @@ -245,8 +304,8 @@ public class PulsarSinkE2ETest { }, 5, 150); // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages // due to publish failure - Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator() - .next().unackedMessages, 0); + Assert.assertEquals( + admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, 0); } @@ -281,7 +340,7 @@ public class PulsarSinkE2ETest { // set up sink spec SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - //sinkSpecBuilder.setClassName(PulsarSink.class.getName()); + // sinkSpecBuilder.setClassName(PulsarSink.class.getName()); sinkSpecBuilder.setTopic(String.format("persistent://%s/%s/%s", tenant, namespace, "output")); Map<String, Object> sinkConfigMap = Maps.newHashMap(); sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap)); @@ -290,4 +349,4 @@ public class PulsarSinkE2ETest { return functionDetailsBuilder.build(); } -} +} \ No newline at end of file diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 6915774..4b17ff5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -51,6 +51,8 @@ import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Resources; @@ -642,11 +644,34 @@ public class CmdFunctions extends CmdBase { @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") protected String brokerServiceUrl; + + @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker") + protected String clientAuthPlugin; + + @Parameter(names = "--clientAuthParams", description = "Client authentication param") + protected String clientAuthParams; + + @Parameter(names = "--use_tls", description = "Use tls connection\n") + protected boolean useTls; + + @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n") + protected boolean tlsAllowInsecureConnection; + + @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification") + protected boolean tlsHostNameVerificationEnabled; + + @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path") + protected String tlsTrustCertFilePath; @Override void runCmd() throws Exception { - CmdFunctions.startLocalRun(convertProto2(functionConfig), - functionConfig.getParallelism(), brokerServiceUrl, userCodeFile, admin); + CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(), brokerServiceUrl, + AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) + .clientAuthenticationParameters(clientAuthParams).useTls(useTls) + .tlsAllowInsecureConnection(tlsAllowInsecureConnection) + .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) + .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), + userCodeFile, admin); } } @@ -911,7 +936,8 @@ public class CmdFunctions extends CmdBase { } protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails, - int parallelism, String brokerServiceUrl, String userCodeFile, PulsarAdmin admin) + int parallelism, String brokerServiceUrl, AuthenticationConfig authConfig, + String userCodeFile, PulsarAdmin admin) throws Exception { String serviceUrl = admin.getServiceUrl(); @@ -921,8 +947,8 @@ public class CmdFunctions extends CmdBase { if (serviceUrl == null) { serviceUrl = DEFAULT_SERVICE_URL; } - try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory( - serviceUrl, null, null, null)) { + try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, authConfig, null, null, + null)) { List<RuntimeSpawner> spawners = new LinkedList<>(); for (int i = 0; i < parallelism; ++i) { InstanceConfig instanceConfig = new InstanceConfig(); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 6f67ee7..59b9201 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -29,6 +29,7 @@ import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Resources; @@ -99,11 +100,35 @@ public class CmdSinks extends CmdBase { @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") protected String brokerServiceUrl; + + @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker") + protected String clientAuthPlugin; + + @Parameter(names = "--clientAuthParams", description = "Client authentication param") + protected String clientAuthParams; + + @Parameter(names = "--use_tls", description = "Use tls connection\n") + protected boolean useTls; + + @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n") + protected boolean tlsAllowInsecureConnection; + + @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification") + protected boolean tlsHostNameVerificationEnabled; + + @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path") + protected String tlsTrustCertFilePath; @Override void runCmd() throws Exception { - CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), - sinkConfig.getParallelism(), brokerServiceUrl, jarFile, admin); + CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), sinkConfig.getParallelism(), + brokerServiceUrl, + AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) + .clientAuthenticationParameters(clientAuthParams).useTls(useTls) + .tlsAllowInsecureConnection(tlsAllowInsecureConnection) + .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) + .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), + jarFile, admin); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 8348382..a3bec4a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -29,6 +29,7 @@ import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Resources; import org.apache.pulsar.functions.proto.Function.SinkSpec; @@ -95,11 +96,35 @@ public class CmdSources extends CmdBase { @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") protected String brokerServiceUrl; + + @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker") + protected String clientAuthPlugin; + + @Parameter(names = "--clientAuthParams", description = "Client authentication param") + protected String clientAuthParams; + + @Parameter(names = "--use_tls", description = "Use tls connection\n") + protected boolean useTls; + + @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n") + protected boolean tlsAllowInsecureConnection; + + @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification") + protected boolean tlsHostNameVerificationEnabled; + + @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path") + protected String tlsTrustCertFilePath; @Override void runCmd() throws Exception { - CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), - sourceConfig.getParallelism(), brokerServiceUrl, jarFile, admin); + CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), sourceConfig.getParallelism(), + brokerServiceUrl, + AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) + .clientAuthenticationParameters(clientAuthParams).useTls(useTls) + .tlsAllowInsecureConnection(tlsAllowInsecureConnection) + .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) + .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), + jarFile, admin); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 1d3280b..cedc892 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -55,7 +55,7 @@ public class ClientConfigurationData implements Serializable, Cloneable { private int maxLookupRequest = 50000; private int maxNumberOfRejectedRequestPerConnection = 50; private int keepAliveIntervalSeconds = 30; - + public ClientConfigurationData clone() { try { return (ClientConfigurationData) super.clone(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java new file mode 100644 index 0000000..f4c78ea --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** + * Configuration to aggregate various authentication params. + */ +@Data +@Getter +@Setter +@EqualsAndHashCode +@ToString +@Builder +public class AuthenticationConfig { + private String clientAuthenticationPlugin; + private String clientAuthenticationParameters; + private String tlsTrustCertsFilePath; + private boolean useTls; + private boolean tlsAllowInsecureConnection; + private boolean tlsHostnameVerificationEnable; +} diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index b3eaa1c..a3c1179 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -30,6 +30,7 @@ import signal import time import json +from pulsar import Authentication import pulsar import Function_pb2 @@ -63,6 +64,12 @@ def main(): parser.add_argument('--function_version', required=True, help='Function Version') parser.add_argument('--processing_guarantees', required=True, help='Processing Guarantees') parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar Service Url') + parser.add_argument('--client_auth_plugin', required=False, help='Client authentication plugin') + parser.add_argument('--client_auth_params', required=False, help='Client authentication params') + parser.add_argument('--use_tls', required=False, help='Use tls') + parser.add_argument('--tls_allow_insecure_connection', required=False, help='Tls allow insecure connection') + parser.add_argument('--hostname_verification_enabled', required=False, help='Enable hostname verification') + parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust cert file path') parser.add_argument('--port', required=True, help='Instance Port', type=int) parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples') parser.add_argument('--user_config', required=False, help='User Config') @@ -124,7 +131,19 @@ def main(): if args.user_config != None and len(args.user_config) != 0: function_details.userConfig = args.user_config - pulsar_client = pulsar.Client(args.pulsar_serviceurl) + authentication = None + use_tls = False + tls_allow_insecure_connection = False + tls_trust_cert_path = None + if args.client_auth_plugin and args.client_auth_params: + authentication = pulsar.Authentication(args.client_auth_plugin, args.client_auth_params) + if args.use_tls == "true": + use_tls = True + if args.tls_allow_insecure_connection == "true": + tls_allow_insecure_connection = True + if args.tls_trust_cert_path: + tls_trust_cert_path = args.tls_trust_cert_path + pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection) pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id), str(args.function_version), function_details, int(args.max_buffered_tuples), str(args.py), diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index b602a1d..554f9ac 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -30,6 +30,8 @@ import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import static org.apache.commons.lang3.StringUtils.isNotBlank; + +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; @@ -39,7 +41,6 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceControlGrpc; -import java.lang.reflect.Type; import java.util.Map; import java.util.TimerTask; import java.util.concurrent.ExecutionException; @@ -82,7 +83,25 @@ public class JavaInstanceMain implements AutoCloseable { @Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true) protected String pulsarServiceUrl; - + + @Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n") + protected String clientAuthenticationPlugin; + + @Parameter(names = "--client_auth_params", description = "Client auth param\n") + protected String clientAuthenticationParameters; + + @Parameter(names = "--use_tls", description = "Use tls connection\n") + protected String useTls = Boolean.FALSE.toString(); + + @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n") + protected String tlsAllowInsecureConnection = Boolean.TRUE.toString(); + + @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification") + protected String tlsHostNameVerificationEnabled = Boolean.FALSE.toString(); + + @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path") + protected String tlsTrustCertFilePath; + @Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required= false) protected String stateStorageServiceUrl; @@ -96,7 +115,7 @@ public class JavaInstanceMain implements AutoCloseable { protected String userConfig; @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n") - protected String autoAck = "true"; + protected String autoAck = Boolean.TRUE.toString(); @Parameter(names = "--source_classname", description = "The source classname") protected String sourceClassname; @@ -158,11 +177,7 @@ public class JavaInstanceMain implements AutoCloseable { functionDetailsBuilder.setLogTopic(logTopic); } functionDetailsBuilder.setProcessingGuarantees(processingGuarantees); - if (autoAck.equals("true")) { - functionDetailsBuilder.setAutoAck(true); - } else { - functionDetailsBuilder.setAutoAck(false); - } + functionDetailsBuilder.setAutoAck(isTrue(autoAck)); if (userConfig != null && !userConfig.isEmpty()) { functionDetailsBuilder.setUserConfig(userConfig); } @@ -207,10 +222,13 @@ public class JavaInstanceMain implements AutoCloseable { instanceConfig.setFunctionDetails(functionDetails); instanceConfig.setPort(port); - ThreadRuntimeFactory containerFactory = new ThreadRuntimeFactory( - "LocalRunnerThreadGroup", - pulsarServiceUrl, - stateStorageServiceUrl); + ThreadRuntimeFactory containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl, + stateStorageServiceUrl, + AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin) + .clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls)) + .tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection)) + .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled)) + .tlsTrustCertsFilePath(tlsTrustCertFilePath).build()); runtimeSpawner = new RuntimeSpawner( instanceConfig, jarFile, @@ -257,6 +275,10 @@ public class JavaInstanceMain implements AutoCloseable { close(); } + private static boolean isTrue(String param) { + return Boolean.TRUE.toString().equals(param); + } + public static void main(String[] args) throws Exception { JavaInstanceMain javaInstanceMain = new JavaInstanceMain(); JCommander jcommander = new JCommander(javaInstanceMain); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 715eab6..e94e39d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -30,6 +30,9 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import static org.apache.commons.lang3.StringUtils.isNotBlank; + +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -66,17 +69,20 @@ class ProcessRuntime implements Runtime { String instanceFile, String logDirectory, String codeFile, - String pulsarServiceUrl) { + String pulsarServiceUrl, + AuthenticationConfig authConfig) { this.instanceConfig = instanceConfig; this.instancePort = instanceConfig.getPort(); - this.processArgs = composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl); + this.processArgs = composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, + authConfig); } private List<String> composeArgs(InstanceConfig instanceConfig, String instanceFile, String logDirectory, String codeFile, - String pulsarServiceUrl) { + String pulsarServiceUrl, + AuthenticationConfig authConfig) { List<String> args = new LinkedList<>(); if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { args.add("java"); @@ -135,6 +141,25 @@ class ProcessRuntime implements Runtime { args.add(String.valueOf(instanceConfig.getFunctionDetails().getProcessingGuarantees())); args.add("--pulsar_serviceurl"); args.add(pulsarServiceUrl); + if (authConfig != null) { + if (isNotBlank(authConfig.getClientAuthenticationPlugin()) + && isNotBlank(authConfig.getClientAuthenticationParameters())) { + args.add("--client_auth_plugin"); + args.add(authConfig.getClientAuthenticationPlugin()); + args.add("--client_auth_params"); + args.add(authConfig.getClientAuthenticationParameters()); + } + args.add("--use_tls"); + args.add(Boolean.toString(authConfig.isUseTls())); + args.add("--tls_allow_insecure"); + args.add(Boolean.toString(authConfig.isTlsAllowInsecureConnection())); + args.add("--hostname_verification_enabled"); + args.add(Boolean.toString(authConfig.isTlsHostnameVerificationEnable())); + if(isNotBlank(authConfig.getTlsTrustCertsFilePath())) { + args.add("--tls_trust_cert_path"); + args.add(authConfig.getTlsTrustCertsFilePath()); + } + } args.add("--max_buffered_tuples"); args.add(String.valueOf(instanceConfig.getMaxBufferedTuples())); String userConfig = instanceConfig.getFunctionDetails().getUserConfig(); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java index 2223cb7..8fc5b90 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java @@ -21,6 +21,8 @@ package org.apache.pulsar.functions.runtime; import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import java.nio.file.Paths; @@ -32,17 +34,20 @@ import java.nio.file.Paths; public class ProcessRuntimeFactory implements RuntimeFactory { private String pulsarServiceUrl; + private AuthenticationConfig authConfig; private String javaInstanceJarFile; private String pythonInstanceFile; private String logDirectory; @VisibleForTesting public ProcessRuntimeFactory(String pulsarServiceUrl, + AuthenticationConfig authConfig, String javaInstanceJarFile, String pythonInstanceFile, String logDirectory) { this.pulsarServiceUrl = pulsarServiceUrl; + this.authConfig = authConfig; this.javaInstanceJarFile = javaInstanceJarFile; this.pythonInstanceFile = pythonInstanceFile; this.logDirectory = logDirectory; @@ -100,7 +105,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory { instanceFile, logDirectory, codeFile, - pulsarServiceUrl); + pulsarServiceUrl, + authConfig); } @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java index df89eed..f9c9cff 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java @@ -23,8 +23,13 @@ import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl; @@ -41,26 +46,40 @@ public class ThreadRuntimeFactory implements RuntimeFactory { private final String storageServiceUrl; private volatile boolean closed; - public ThreadRuntimeFactory(String threadGroupName, - String pulsarServiceUrl, - String storageServiceUrl) - throws Exception { - this( - threadGroupName, - pulsarServiceUrl != null ? PulsarClient.builder().serviceUrl(pulsarServiceUrl).build() : null, - storageServiceUrl); + public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl, + AuthenticationConfig authConfig) throws Exception { + this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl); } @VisibleForTesting - ThreadRuntimeFactory(String threadGroupName, - PulsarClient pulsarClient, - String storageServiceUrl) { + ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl) { this.fnCache = new FunctionCacheManagerImpl(); this.threadGroup = new ThreadGroup(threadGroupName); this.pulsarClient = pulsarClient; this.storageServiceUrl = storageServiceUrl; } + private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) + throws PulsarClientException { + ClientBuilder clientBuilder = null; + if (isNotBlank(pulsarServiceUrl)) { + clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl); + if (authConfig != null) { + if (isNotBlank(authConfig.getClientAuthenticationPlugin()) + && isNotBlank(authConfig.getClientAuthenticationParameters())) { + clientBuilder.authentication(authConfig.getClientAuthenticationPlugin(), + authConfig.getClientAuthenticationParameters()); + } + clientBuilder.enableTls(authConfig.isUseTls()); + clientBuilder.allowTlsInsecureConnection(authConfig.isTlsAllowInsecureConnection()); + clientBuilder.enableTlsHostnameVerification(authConfig.isTlsHostnameVerificationEnable()); + clientBuilder.tlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath()); + } + return clientBuilder.build(); + } + return null; + } + @Override public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile) { return new ThreadRuntime( diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 65e8d89..675f3de 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -66,7 +66,7 @@ public class ProcessRuntimeTest { this.pulsarServiceUrl = "pulsar://localhost:6670"; this.logDirectory = "Users/user/logs"; this.factory = new ProcessRuntimeFactory( - pulsarServiceUrl, javaInstanceJarFile, pythonInstanceFile, logDirectory); + pulsarServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory); } @AfterMethod diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index af23992..ba1a33d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.functions.proto.Function.Assignment; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate; import org.apache.pulsar.functions.runtime.RuntimeFactory; @@ -92,14 +93,23 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader); + AuthenticationConfig authConfig = AuthenticationConfig.builder() + .clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin()) + .clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters()) + .tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()) + .useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection()) + .tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build(); + if (workerConfig.getThreadContainerFactory() != null) { this.runtimeFactory = new ThreadRuntimeFactory( workerConfig.getThreadContainerFactory().getThreadGroupName(), workerConfig.getPulsarServiceUrl(), - workerConfig.getStateStorageServiceUrl()); + workerConfig.getStateStorageServiceUrl(), + authConfig); } else if (workerConfig.getProcessContainerFactory() != null) { this.runtimeFactory = new ProcessRuntimeFactory( workerConfig.getPulsarServiceUrl(), + authConfig, workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(), workerConfig.getProcessContainerFactory().getPythonInstanceLocation(), workerConfig.getProcessContainerFactory().getLogDirectory()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java index dde017f..de47a33 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java @@ -280,7 +280,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { private PulsarAdmin getPulsarAdminClient() { if (this.pulsarAdminClient == null) { - this.pulsarAdminClient = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl()); + this.pulsarAdminClient = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl(), + workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), + workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection()); } return this.pulsarAdminClient; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 51de807..576257b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -35,7 +35,7 @@ import java.nio.channels.ReadableByteChannel; import java.util.UUID; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.distributedlog.AppendOnlyStreamWriter; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.api.DistributedLogManager; @@ -44,6 +44,7 @@ import org.apache.distributedlog.exceptions.ZKException; import org.apache.distributedlog.impl.metadata.BKDLConfig; import org.apache.distributedlog.metadata.DLMetadata; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.worker.dlog.DLInputStream; import org.apache.pulsar.functions.worker.dlog.DLOutputStream; @@ -212,9 +213,17 @@ public final class Utils { return dlogUri; } - public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) { + public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams, String tlsTrustCertsFilePath, boolean allowTlsInsecureConnection) { try { - return PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl).build(); + PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl); + if (isNotBlank(authPlugin) && isNotBlank(authParams)) { + adminBuilder.authentication(authPlugin, authParams); + } + if (isNotBlank(tlsTrustCertsFilePath)) { + adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); + } + adminBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection); + return adminBuilder.build(); } catch (PulsarClientException e) { log.error("Error creating pulsar admin client", e); throw new RuntimeException(e); @@ -235,7 +244,7 @@ public final class Utils { } public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { - return StringUtils.isNotBlank(functionPkgUrl) + return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(Utils.HTTP) || functionPkgUrl.startsWith(Utils.FILE)); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 4f700ab..6e87b94 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -71,7 +71,9 @@ public class Worker extends AbstractService { private static URI initialize(WorkerConfig workerConfig) throws InterruptedException, PulsarAdminException, IOException { // initializing pulsar functions namespace - PulsarAdmin admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl()); + PulsarAdmin admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), + workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), + workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection()); InternalConfigurationData internalConf; // make sure pulsar broker is up log.info("Checking if pulsar service at {} is up...", workerConfig.getPulsarWebServiceUrl()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 535a7dd..efbd0fd 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -63,7 +63,13 @@ public class WorkerConfig implements Serializable { private int initialBrokerReconnectMaxRetries; private int assignmentWriteMaxRetries; private long instanceLivenessCheckFreqMs; - + private String clientAuthenticationPlugin; + private String clientAuthenticationParameters; + private boolean useTls = false; + private String tlsTrustCertsFilePath = ""; + private boolean tlsAllowInsecureConnection = false; + private boolean tlsHostnameVerificationEnable = false; + @Data @Setter @Getter diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index dfe5834..4740d69 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -23,9 +23,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.net.URI; import lombok.Getter; import lombok.extern.slf4j.Slf4j; + +import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -77,7 +80,17 @@ public class WorkerService { // initialize the function metadata manager try { - this.client = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()).build(); + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()); + if (isNotBlank(workerConfig.getClientAuthenticationPlugin()) + && isNotBlank(workerConfig.getClientAuthenticationParameters())) { + clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(), + workerConfig.getClientAuthenticationParameters()); + } + clientBuilder.enableTls(workerConfig.isUseTls()); + clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection()); + clientBuilder.tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()); + clientBuilder.enableTlsHostnameVerification(workerConfig.isTlsHostnameVerificationEnable()); + this.client = clientBuilder.build(); log.info("Created Pulsar client"); //create scheduler manager -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.