This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 95d4bf7  Add authorization support on function apis (#2213)
95d4bf7 is described below

commit 95d4bf7c2cd52448b5e482f6fac44b24c088d554
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Jul 23 20:18:25 2018 -0700

    Add authorization support on function apis (#2213)
    
    * Add authorization support on function apis
    
    * fix authorization enable check
---
 conf/functions_worker.yml                          |  8 +++
 .../pulsar/broker/admin/impl/FunctionsBase.java    | 11 ++--
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    | 41 ++++++++++++++-
 pulsar-functions/worker/pom.xml                    |  6 +++
 .../pulsar/functions/worker/WorkerConfig.java      | 22 +++++++-
 .../pulsar/functions/worker/WorkerService.java     | 17 +++++-
 .../functions/worker/rest/FunctionApiResource.java | 12 +++++
 .../pulsar/functions/worker/rest/WorkerServer.java | 15 ++++++
 .../functions/worker/rest/api/FunctionsImpl.java   | 61 ++++++++++++++++++++--
 .../worker/rest/api/v2/FunctionApiV2Resource.java  | 14 +++--
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 25 +++++----
 11 files changed, 203 insertions(+), 29 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 58bcf1d..4194337 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -45,3 +45,11 @@ initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 30000
 metricsSamplingPeriodSec: 60
+# Enforce authentication
+authenticationEnabled: false
+# Enforce authorization on accessing functions api
+authorizationEnabled: false
+# Set of autentication provider name list, which is a list of class names
+authenticationProviders: 
+# Set of role names that are treated as "super-user", meaning they will be 
able to access any admin-api
+superUserRoles: 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 6338ce9..f97f180 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -82,8 +82,8 @@ public class FunctionsBase extends AdminResource implements 
Supplier<WorkerServi
                                      final @FormDataParam("url") String 
functionPkgUrl,
                                      final @FormDataParam("functionDetails") 
String functionDetailsJson) {
 
-        return functions.registerFunction(
-            tenant, namespace, functionName, uploadedInputStream, fileDetail, 
functionPkgUrl, functionDetailsJson);
+        return functions.registerFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, clientAppId());
     }
 
     @PUT
@@ -103,8 +103,8 @@ public class FunctionsBase extends AdminResource implements 
Supplier<WorkerServi
                                    final @FormDataParam("url") String 
functionPkgUrl,
                                    final @FormDataParam("functionDetails") 
String functionDetailsJson) {
 
-        return functions.updateFunction(
-            tenant, namespace, functionName, uploadedInputStream, fileDetail, 
functionPkgUrl, functionDetailsJson);
+        return functions.updateFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, clientAppId());
 
     }
 
@@ -122,8 +122,7 @@ public class FunctionsBase extends AdminResource implements 
Supplier<WorkerServi
     public Response deregisterFunction(final @PathParam("tenant") String 
tenant,
                                        final @PathParam("namespace") String 
namespace,
                                        final @PathParam("functionName") String 
functionName) {
-        return functions.deregisterFunction(
-            tenant, namespace, functionName);
+        return functions.deregisterFunction(tenant, namespace, functionName, 
clientAppId());
     }
 
     @GET
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index a7b34ae..16f1a76 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Maps;
@@ -99,7 +100,6 @@ public class PulsarSinkE2ETest {
     PulsarAdmin admin;
     PulsarClient pulsarClient;
     BrokerStats brokerStatsClient;
-    WorkerServer functionsWorkerServer;
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
     String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
@@ -120,6 +120,11 @@ public class PulsarSinkE2ETest {
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarSinkE2ETest.class);
 
+    @DataProvider(name = "validRoleName")
+    public Object[][] validRoleName() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @BeforeMethod
     void setup(Method method) throws Exception {
 
@@ -187,6 +192,7 @@ public class PulsarSinkE2ETest {
         pulsarClient = clientBuilder.build();
        
         TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add("superUser");
         
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
        
@@ -231,6 +237,9 @@ public class PulsarSinkE2ETest {
         workerConfig.setUseTls(true);
         workerConfig.setTlsAllowInsecureConnection(true);
         workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+        
+        workerConfig.setAuthenticationEnabled(true);
+        workerConfig.setAuthorizationEnabled(true);
 
         return new WorkerService(workerConfig);
     }
@@ -416,4 +425,34 @@ public class PulsarSinkE2ETest {
 
         return functionDetailsBuilder.build();
     }
+    
+    @Test(dataProvider = "validRoleName")
+    public void testAuthorization(boolean validRoleName) throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        String roleName = validRoleName ? "superUser" : "invalid";
+        TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add(roleName);
+        
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+        admin.tenants().updateTenant(tenant, propAdmin);
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
+                sinkTopic, subscriptionName);
+        try {
+            admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+            Assert.assertTrue(validRoleName);
+        } catch 
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) 
{
+            Assert.assertFalse(validRoleName);
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index f55781e..bec8818 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -36,6 +36,12 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-functions-runtime</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 09f54ef..eda9b15 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -20,14 +20,18 @@ package org.apache.pulsar.functions.worker;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.collect.Sets;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Properties;
+import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.configuration.PulsarConfiguration;
 
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -42,7 +46,7 @@ import lombok.experimental.Accessors;
 @EqualsAndHashCode
 @ToString
 @Accessors(chain = true)
-public class WorkerConfig implements Serializable {
+public class WorkerConfig implements Serializable, PulsarConfiguration {
 
     private static final long serialVersionUID = 1L;
 
@@ -74,6 +78,17 @@ public class WorkerConfig implements Serializable {
     private boolean tlsAllowInsecureConnection = false;
     private boolean tlsHostnameVerificationEnable = false;
     private int metricsSamplingPeriodSec = 60;
+    // Enforce authentication
+    private boolean authenticationEnabled = false;
+    // Autentication provider name list, which is a list of class names
+    private Set<String> authenticationProviders = Sets.newTreeSet();
+    // Enforce authorization on accessing functions admin-api
+    private boolean authorizationEnabled = false;
+    // Role names that are treated as "super-user", meaning they will be able 
to access any admin-api
+    private Set<String> superUserRoles = Sets.newTreeSet();
+    
+    private Properties properties = new Properties();
+
 
     @Data
     @Setter
@@ -135,4 +150,9 @@ public class WorkerConfig implements Serializable {
             throw new IllegalStateException("Failed to resolve localhost 
name.", ex);
         }
     }
+  
+    @Override
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 29dff53..6af9c8f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -35,9 +35,12 @@ import static 
org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 
 /**
  * A service component contains everything to run a worker except rest server.
@@ -57,8 +60,9 @@ public class WorkerService {
     private SchedulerManager schedulerManager;
     private boolean isInitialized = false;
     private final ScheduledExecutorService statsUpdater;
-
+    private AuthenticationService authenticationService;
     private ConnectorsManager connectorsManager;
+    private PulsarAdmin admin;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
@@ -68,6 +72,11 @@ public class WorkerService {
 
     public void start(URI dlogUri) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
+        
+        this.admin = 
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+                workerConfig.getClientAuthenticationPlugin(), 
workerConfig.getClientAuthenticationParameters(),
+                workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection());
+        
         try {
             log.info("Worker Configs: {}", new 
ObjectMapper().writerWithDefaultPrettyPrinter()
                     .writeValueAsString(workerConfig));
@@ -128,6 +137,8 @@ public class WorkerService {
 
             // initialize function metadata manager
             this.functionMetaDataManager.initialize();
+            
+            authenticationService = new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
 
             // Starting cluster services
             log.info("Start cluster services...");
@@ -200,6 +211,10 @@ public class WorkerService {
         if (null != schedulerManager) {
             schedulerManager.close();
         }
+        
+        if (null != this.admin) {
+            this.admin.close();
+        }
     }
 
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 1c5c739..4673d56 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -18,9 +18,13 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
+import java.util.Optional;
 import java.util.function.Supplier;
 import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Context;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 
@@ -32,6 +36,8 @@ public class FunctionApiResource implements 
Supplier<WorkerService> {
     private WorkerService workerService;
     @Context
     protected ServletContext servletContext;
+    @Context
+    protected HttpServletRequest httpRequest;
 
     public FunctionApiResource() {
         this.functions = new FunctionsImpl(this);
@@ -44,4 +50,10 @@ public class FunctionApiResource implements 
Supplier<WorkerService> {
         }
         return this.workerService;
     }
+
+    public String clientAppId() {
+        return httpRequest != null
+                ? (String) 
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+                : null;
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 524a6ad..a57a952 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -19,8 +19,15 @@
 package org.apache.pulsar.functions.worker.rest;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
+
+import javax.servlet.DispatcherType;
+
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.eclipse.jetty.server.Handler;
@@ -28,9 +35,11 @@ import org.eclipse.jetty.server.Server;
 
 import java.net.BindException;
 import java.net.URI;
+
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.glassfish.jersey.server.ResourceConfig;
@@ -41,6 +50,7 @@ public class WorkerServer implements Runnable {
 
     private final WorkerConfig workerConfig;
     private final WorkerService workerService;
+    private static final String MATCH_ALL = "/*";
 
     private static String getErrorMessage(Server server, int port, Exception 
ex) {
         if (ex instanceof BindException) {
@@ -106,7 +116,12 @@ public class WorkerServer implements Runnable {
         final ServletHolder apiServlet =
                 new ServletHolder(new ServletContainer(config));
         contextHandler.addServlet(apiServlet, "/*");
+        if (workerService.getWorkerConfig().isAuthenticationEnabled()) {
+            FilterHolder filter = new FilterHolder(new 
AuthenticationFilter(workerService.getAuthenticationService()));
+            contextHandler.addFilter(filter, MATCH_ALL, 
EnumSet.allOf(DispatcherType.class));
+        }
 
         return contextHandler;
     }
+    
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 2ac48dc..b935bf5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -57,12 +57,14 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -117,11 +119,24 @@ public class FunctionsImpl {
 
     public Response registerFunction(final String tenant, final String 
namespace, final String functionName,
             final InputStream uploadedInputStream, final 
FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson) {
+            final String functionPkgUrl, final String functionDetailsJson, 
final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
+        
+        try {
+            if (!isAuthorizedRole(tenant, clientRole)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register function", tenant, namespace, functionName,
+                        clientRole);
+                return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData("client is not authorize to 
perform operation")).build();
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
functionName, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
 
         FunctionDetails functionDetails;
         boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
@@ -168,12 +183,25 @@ public class FunctionsImpl {
 
     public Response updateFunction(final String tenant, final String 
namespace, final String functionName,
             final InputStream uploadedInputStream, final 
FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson) {
+            final String functionPkgUrl, final String functionDetailsJson, 
final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, clientRole)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update function", tenant, namespace,
+                        functionName, clientRole);
+                return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData("client is not authorize to 
perform operation")).build();
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
functionName, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+        
         FunctionDetails functionDetails;
         boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
         // validate parameters
@@ -217,12 +245,26 @@ public class FunctionsImpl {
                 : updateRequest(functionMetaDataBuilder.build(), 
uploadedInputStream);
     }
 
-    public Response deregisterFunction(final String tenant, final String 
namespace, final String functionName) {
+    public Response deregisterFunction(final String tenant, final String 
namespace, final String functionName,
+            String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, clientRole)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
deregister function", tenant, namespace,
+                        functionName, clientRole);
+                return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData("client is not authorize to 
perform operation")).build();
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
functionName, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+        
         // validate parameters
         try {
             validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -893,5 +935,18 @@ public class FunctionsImpl {
         return String.format("%s/%s/%s/%s", tenant, namespace, 
Codec.encode(functionName),
                 Utils.getUniquePackageName(Codec.encode(fileName)));
     }
+    
+    private boolean isAuthorizedRole(String tenant, String clientRole) throws 
PulsarAdminException {
+        if (worker().getWorkerConfig().isAuthorizationEnabled()) {
+            // skip authorization if client role is super-user
+            if (clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) {
+                return true;
+            }
+            TenantInfo tenantInfo = 
worker().getAdmin().tenants().getTenantInfo(tenant);
+            return clientRole != null && (tenantInfo.getAdminRoles() == null 
|| tenantInfo.getAdminRoles().isEmpty()
+                    || tenantInfo.getAdminRoles().contains(clientRole));
+        }
+        return true;
+    }
 
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index a23c41b..92957b3 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -62,8 +62,8 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
                                      final @FormDataParam("url") String 
functionPkgUrl,
                                      final @FormDataParam("functionDetails") 
String functionDetailsJson) {
 
-        return functions.registerFunction(
-            tenant, namespace, functionName, uploadedInputStream, fileDetail, 
functionPkgUrl, functionDetailsJson);
+        return functions.registerFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, clientAppId());
 
     }
 
@@ -78,8 +78,8 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
                                    final @FormDataParam("url") String 
functionPkgUrl,
                                    final @FormDataParam("functionDetails") 
String functionDetailsJson) {
 
-        return functions.updateFunction(
-            tenant, namespace, functionName, uploadedInputStream, fileDetail, 
functionPkgUrl, functionDetailsJson);
+        return functions.updateFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, functionDetailsJson, clientAppId());
 
     }
 
@@ -87,10 +87,8 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
     @DELETE
     @Path("/{tenant}/{namespace}/{functionName}")
     public Response deregisterFunction(final @PathParam("tenant") String 
tenant,
-                                       final @PathParam("namespace") String 
namespace,
-                                       final @PathParam("functionName") String 
functionName) {
-        return functions.deregisterFunction(
-            tenant, namespace, functionName);
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName) {
+        return functions.deregisterFunction(tenant, namespace, functionName, 
clientAppId());
     }
 
     @GET
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 866b92e..c59d03d 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -314,7 +314,8 @@ public class FunctionApiV2ResourceTest {
                 inputStream,
                 details,
                 null,
-                
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+                
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+                null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         if (missingFieldName.equals("parallelism")) {
@@ -342,7 +343,8 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             null,
-            
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null);
     }
 
     @Test
@@ -587,7 +589,8 @@ public class FunctionApiV2ResourceTest {
             inputStream,
             details,
             null,
-            
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         if (missingFieldName.equals("parallelism")) {
@@ -615,7 +618,8 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             null,
-            
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null);
     }
 
     @Test
@@ -696,7 +700,8 @@ public class FunctionApiV2ResourceTest {
             null,
             null,
             filePackageUrl,
-            
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
@@ -783,7 +788,8 @@ public class FunctionApiV2ResourceTest {
         Response response = resource.deregisterFunction(
             tenant,
             namespace,
-            function);
+            function,
+            null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not 
provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -793,7 +799,8 @@ public class FunctionApiV2ResourceTest {
         return resource.deregisterFunction(
             tenant,
             namespace,
-            function);
+            function,
+            null);
     }
 
     @Test
@@ -1043,7 +1050,7 @@ public class FunctionApiV2ResourceTest {
                         
.setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
                 .build();
         Response response = resource.registerFunction(tenant, namespace, 
function, null, null, filePackageUrl,
-                
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+                
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
@@ -1068,7 +1075,7 @@ public class FunctionApiV2ResourceTest {
                         
.setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
                 .build();
         Response response = resource.registerFunction(tenant, namespace, 
function, null, null, filePackageUrl,
-                
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+                
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
     }

Reply via email to