This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f44d367  Added cli commands to get function cluster related 
information (#2426)
f44d367 is described below

commit f44d367f1bd0de30b4702c174c4697d591f072fc
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Fri Aug 24 23:28:34 2018 -0700

    Added cli commands to get function cluster related information (#2426)
    
    * Added command line to get cluster/cluster leader/function assignment 
information.
    Also refactored such kind of meta requests to a seperate endpoint
    
    * Removed left-over debug statements
    
    * Added back /functionsmetrics
    
    * Added /functionsmetrics back to the broker worker
    
    * Removed leftover references of getcluster
    
    * Fix integration tests
    
    * Fixed instantiation of service in worker only mode
    
    * Removed log statement
    
    * Seperated stats calls to a seperate endpoint to mimic broker
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  44 -----
 .../org/apache/pulsar/broker/admin/v2/Worker.java  |  94 ++++++++++
 .../apache/pulsar/broker/admin/v2/WorkerStats.java |  52 +++---
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  |   8 -
 .../org/apache/pulsar/client/admin/Functions.java  |   7 -
 .../apache/pulsar/client/admin/PulsarAdmin.java    |  10 +-
 .../client/admin/{WorkerStats.java => Worker.java} |  26 ++-
 .../{WorkerStatsImpl.java => WorkerImpl.java}      |  54 +++++-
 ...tionWorkerStats.java => CmdFunctionWorker.java} |  71 +++++++-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  13 --
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   |   2 +-
 .../functions/worker/rest/FunctionApiResource.java |   1 -
 .../pulsar/functions/worker/rest/Resources.java    |   5 +-
 .../pulsar/functions/worker/rest/WorkerServer.java |   4 +
 .../functions/worker/rest/api/FunctionsImpl.java   | 134 +-------------
 .../functions/worker/rest/api/WorkerImpl.java      | 199 +++++++++++++++++++++
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  25 ---
 .../worker/rest/api/v2/WorkerApiV2Resource.java    | 108 +++++++++++
 ...kerStats.java => WorkerStatsApiV2Resource.java} |  75 +++++---
 ...est.java => WorkerApiV2ResourceConfigTest.java} |   2 +-
 .../integration/containers/WorkerContainer.java    |   2 +-
 21 files changed, 640 insertions(+), 296 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 315027f..def5452 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -198,50 +198,6 @@ public class FunctionsBase extends AdminResource 
implements Supplier<WorkerServi
 
     }
 
-    @GET
-    @ApiOperation(
-            value = "Fetches information about the Pulsar cluster running 
Pulsar Functions",
-            response = WorkerInfo.class,
-            responseContainer = "List"
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
-
-    })
-    @Path("/cluster")
-    @Produces(MediaType.APPLICATION_JSON)
-    public List<WorkerInfo> getCluster() {
-        return functions.getCluster();
-    }
-
-    @GET
-    @ApiOperation(
-            value = "Fetches info about the leader node of the Pulsar cluster 
running Pulsar Functions",
-            response = WorkerInfo.class
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
-
-    })
-    @Path("/cluster/leader")
-    public WorkerInfo getClusterLeader() {
-        return functions.getClusterLeader();
-    }
-
-    @GET
-    @ApiOperation(
-            value = "Fetches information about which Pulsar Functions are 
assigned to which Pulsar clusters",
-            response = Assignment.class,
-            responseContainer = "Map"
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
-    })
-    @Path("/assignments")
-    public Response getAssignments() {
-        return functions.getAssignments();
-    }
-
     @POST
     @ApiOperation(
             value = "Triggers a Pulsar Function with a user-specified value or 
file data",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
new file mode 100644
index 0000000..46427cb
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import java.util.function.Supplier;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.worker.WorkerService;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+
+@Slf4j
+@Path("/worker")
+public class Worker extends AdminResource implements Supplier<WorkerService> {
+
+    private final WorkerImpl worker;
+
+    public Worker() {
+        this.worker = new WorkerImpl(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about the Pulsar cluster running 
Pulsar Functions"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+
+    })
+    @Path("/cluster")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getCluster() {
+        return worker.getCluster();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches info about the leader node of the Pulsar cluster 
running Pulsar Functions")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+
+    })
+    @Path("/cluster/leader")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getClusterLeader() {
+        return worker.getClusterLeader();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about which Pulsar Functions are 
assigned to which Pulsar clusters",
+            response = Function.Assignment.class,
+            responseContainer = "Map"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+    })
+    @Path("/assignments")
+    public Response getAssignments() {
+        return worker.getAssignments();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index 5a0e4b7..1a9c7ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -18,39 +18,51 @@
  */
 package org.apache.pulsar.broker.admin.v2;
 
-import java.io.IOException;
-import java.util.Collection;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Supplier;
 
 @Slf4j
 @Path("/worker-stats")
-public class WorkerStats extends FunctionApiResource {
+public class WorkerStats extends AdminResource implements 
Supplier<WorkerService> {
 
-    @GET
-    @Path("/functions")
-    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getStats() throws IOException {
-        return functions.getFunctionsMetrcis(clientAppId());
+    private final WorkerImpl worker;
+
+    public WorkerStats() {
+        this.worker = new WorkerImpl(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
     }
-    
+
     @GET
     @Path("/metrics")
     @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
     public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
-        return functions.getWorkerMetrcis(clientAppId());
+        return worker.getWorkerMetrcis(clientAppId());
+    }
+
+    @GET
+    @Path("/functionsmetrics")
+    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not 
running") })
+    public Response getStats() throws IOException {
+        return worker.getFunctionsMetrics(clientAppId());
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 427976f..a758c87 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -209,12 +209,4 @@ public class PulsarFunctionAdminTest {
 
         return new WorkerService(workerConfig);
     }
-
-    @Test
-    public void testGetWokersApi() throws Exception {
-        List<WorkerInfo> workers = admin.functions().getCluster();
-        Assert.assertEquals(workers.size(), 1);
-        Assert.assertEquals(workers.get(0).getPort(), workerServicePort);
-    }
-
 }
\ No newline at end of file
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 2fc1796..96e9cbc 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -320,11 +320,4 @@ public interface Functions {
      *
      */
     Set<String> getSinks() throws PulsarAdminException;
-
-    /**
-     * Get list of workers present under a cluster
-     * @return
-     * @throws PulsarAdminException
-     */
-    List<WorkerInfo> getCluster() throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 7a41fe9..a1502b0 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -43,7 +43,7 @@ import org.apache.pulsar.client.admin.internal.NamespacesImpl;
 import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
 import org.apache.pulsar.client.admin.internal.SchemasImpl;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
-import org.apache.pulsar.client.admin.internal.WorkerStatsImpl;
+import org.apache.pulsar.client.admin.internal.WorkerImpl;
 import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
 import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
@@ -85,7 +85,7 @@ public class PulsarAdmin implements Closeable {
     private final String serviceUrl;
     private final Lookup lookups;
     private final Functions functions;
-    private final WorkerStats workerStats;
+    private final Worker worker;
     private final Schemas schemas;
     protected final WebTarget root;
     protected final Authentication auth;
@@ -189,7 +189,7 @@ public class PulsarAdmin implements Closeable {
         this.resourceQuotas = new ResourceQuotasImpl(root, auth);
         this.lookups = new LookupImpl(root, auth, useTls);
         this.functions = new FunctionsImpl(root, auth);
-        this.workerStats = new WorkerStatsImpl(root, auth);
+        this.worker = new WorkerImpl(root, auth);
         this.schemas = new SchemasImpl(root, auth);
         this.bookies = new BookiesImpl(root, auth);
     }
@@ -361,8 +361,8 @@ public class PulsarAdmin implements Closeable {
     *
     * @return the Worker stats
     */
-   public WorkerStats workerStats() {
-       return workerStats;
+   public Worker worker() {
+       return worker;
    }
     
     /**
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
 b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
similarity index 66%
rename from 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
rename to 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
index 4fc242f..709b713 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
@@ -19,13 +19,16 @@
 package org.apache.pulsar.client.admin;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.WorkerInfo;
 
 /**
  * Admin interface for worker stats management.
  */
-public interface WorkerStats {
+public interface Worker {
     
     
     /**
@@ -41,4 +44,25 @@ public interface WorkerStats {
      * @throws PulsarAdminException
      */
     Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws 
PulsarAdminException;
+
+    /**
+     * Get List of all workers belonging to this cluster
+     * @return
+     * @throws PulsarAdminException
+     */
+    List<WorkerInfo> getCluster() throws PulsarAdminException;
+
+    /**
+     * Get the worker who is the leader of the cluster
+     * @return
+     * @throws PulsarAdminException
+     */
+    WorkerInfo getClusterLeader() throws PulsarAdminException;
+
+    /**
+     * Get the function assignment among the cluster
+     * @return
+     * @throws PulsarAdminException
+     */
+    Map<String, Collection<String>> getAssignments() throws 
PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
similarity index 56%
rename from 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
rename to 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index f492d31..4ca46cc 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -20,35 +20,42 @@ package org.apache.pulsar.client.admin.internal;
 
 import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
 
+import java.lang.reflect.Type;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.WorkerStats;
+import org.apache.pulsar.client.admin.Worker;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.WorkerInfo;
 
 @Slf4j
-public class WorkerStatsImpl extends BaseResource implements WorkerStats {
+public class WorkerImpl extends BaseResource implements Worker {
 
     private final WebTarget workerStats;
+    private final WebTarget worker;
 
-    public WorkerStatsImpl(WebTarget web, Authentication auth) {
+    public WorkerImpl(WebTarget web, Authentication auth) {
         super(auth);
-        this.workerStats = web.path("/admin/worker-stats");
+        this.worker = web.path("/admin/v2/worker");
+        this.workerStats = web.path("/admin/v2/worker-stats");
     }
 
     @Override
     public Metrics getFunctionsStats() throws PulsarAdminException {
         try {
-            Response response = request(workerStats.path("functions")).get();
+            Response response = 
request(workerStats.path("functionsmetrics")).get();
            if (!response.getStatusInfo().equals(Response.Status.OK)) {
                throw new ClientErrorException(response);
            }
@@ -71,4 +78,41 @@ public class WorkerStatsImpl extends BaseResource implements 
WorkerStats {
             throw getApiException(e);
         }
     }
+
+    @Override
+    public List<WorkerInfo> getCluster() throws PulsarAdminException {
+        try {
+            return request(worker.path("cluster"))
+                    .get(new GenericType<List<WorkerInfo>>() {
+                    });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public WorkerInfo getClusterLeader() throws PulsarAdminException {
+        try {
+            return request(worker.path("cluster").path("leader"))
+                    .get(new GenericType<WorkerInfo>(){});
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public Map<String, Collection<String>> getAssignments() throws 
PulsarAdminException {
+        try {
+            Response response = request(worker.path("assignments")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            Type type = new TypeToken<Map<String, 
Collection<String>>>(){}.getType();
+            Map<String, Collection<String>> assignments = new 
Gson().fromJson(jsonResponse, type);
+            return assignments;
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
similarity index 51%
rename from 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
rename to 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
index 6bd22ae..9122375 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
@@ -30,10 +30,15 @@ import com.google.gson.JsonParser;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.WorkerInfo;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
 @Slf4j
 @Parameters(commandDescription = "Operations to collect function-worker 
statistics")
-public class CmdFunctionWorkerStats extends CmdBase {
+public class CmdFunctionWorker extends CmdBase {
 
     /**
      * Base command
@@ -60,7 +65,7 @@ public class CmdFunctionWorkerStats extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            String json = 
Utils.printJson(admin.workerStats().getFunctionsStats());
+            String json = Utils.printJson(admin.worker().getFunctionsStats());
             GsonBuilder gsonBuilder = new GsonBuilder();
             if (indent) {
                 gsonBuilder.setPrettyPrinting();
@@ -77,7 +82,7 @@ public class CmdFunctionWorkerStats extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            String json = new Gson().toJson(admin.workerStats().getMetrics());
+            String json = new Gson().toJson(admin.worker().getMetrics());
             GsonBuilder gsonBuilder = new GsonBuilder();
             if (indent) {
                 gsonBuilder.setPrettyPrinting();
@@ -86,10 +91,64 @@ public class CmdFunctionWorkerStats extends CmdBase {
         }
     }
 
-    public CmdFunctionWorkerStats(PulsarAdmin admin) throws 
PulsarClientException {
-        super("functions", admin);
-        jcommander.addCommand("functions", new FunctionsStats());
+    @Parameters(commandDescription = "Get all workers belonging to this 
cluster")
+    class GetCluster extends BaseCommand {
+
+        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
+        boolean indent = false;
+
+        @Override
+        void runCmd() throws Exception {
+            List<WorkerInfo> workers = admin.worker().getCluster();
+            GsonBuilder gsonBuilder = new GsonBuilder();
+            if (indent) {
+                gsonBuilder.setPrettyPrinting();
+            }
+            System.out.println(gsonBuilder.create().toJson(workers));
+        }
+    }
+
+    @Parameters(commandDescription = "Get the leader of the worker cluster")
+    class GetClusterLeader extends BaseCommand {
+
+        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
+        boolean indent = false;
+
+        @Override
+        void runCmd() throws Exception {
+            WorkerInfo leader = admin.worker().getClusterLeader();
+            GsonBuilder gsonBuilder = new GsonBuilder();
+            if (indent) {
+                gsonBuilder.setPrettyPrinting();
+            }
+            System.out.println(gsonBuilder.create().toJson(leader));
+        }
+    }
+
+    @Parameters(commandDescription = "Get the assignments of the functions 
accross the worker cluster")
+    class GetFunctionAssignments extends BaseCommand {
+
+        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
+        boolean indent = false;
+
+        @Override
+        void runCmd() throws Exception {
+            Map<String, Collection<String>> assignments = 
admin.worker().getAssignments();
+            GsonBuilder gsonBuilder = new GsonBuilder();
+            if (indent) {
+                gsonBuilder.setPrettyPrinting();
+            }
+            System.out.println(gsonBuilder.create().toJson(assignments));
+        }
+    }
+
+    public CmdFunctionWorker(PulsarAdmin admin) throws PulsarClientException {
+        super("functions-worker", admin);
+        jcommander.addCommand("function-stats", new FunctionsStats());
         jcommander.addCommand("monitoring-metrics", new 
CmdMonitoringMetrics());
+        jcommander.addCommand("get-cluster", new GetCluster());
+        jcommander.addCommand("get-cluster-leader", new GetClusterLeader());
+        jcommander.addCommand("get-function-assignments", new 
GetFunctionAssignments());
     }
 
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index f31fd82..88412a6 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -112,7 +112,6 @@ public class CmdFunctions extends CmdBase {
     private final TriggerFunction triggerer;
     private final UploadFunction uploader;
     private final DownloadFunction downloader;
-    private final GetCluster cluster;
 
     /**
      * Base command
@@ -1107,16 +1106,6 @@ public class CmdFunctions extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Get list of workers registered in 
cluster")
-    class GetCluster extends BaseCommand {
-        @Override
-        void runCmd() throws Exception {
-            String json = (new Gson()).toJson(admin.functions().getCluster());
-            Gson gson = new GsonBuilder().setPrettyPrinting().create();
-            System.out.println(gson.toJson(new JsonParser().parse(json)));
-        }
-    }
-
     public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
         super("functions", admin);
         localRunner = new LocalRunner();
@@ -1130,7 +1119,6 @@ public class CmdFunctions extends CmdBase {
         triggerer = new TriggerFunction();
         uploader = new UploadFunction();
         downloader = new DownloadFunction();
-        cluster = new GetCluster();
         restart = new RestartFunction();
         stop = new StopFunction();
         jcommander.addCommand("localrun", getLocalRunner());
@@ -1146,7 +1134,6 @@ public class CmdFunctions extends CmdBase {
         jcommander.addCommand("trigger", getTriggerer());
         jcommander.addCommand("upload", getUploader());
         jcommander.addCommand("download", getDownloader());
-        jcommander.addCommand("cluster", cluster);
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index f30b887..b82172b 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -99,7 +99,7 @@ public class PulsarAdminTool {
 
         commandMap.put("resource-quotas", CmdResourceQuotas.class);
         commandMap.put("functions", CmdFunctions.class);
-        commandMap.put("functions-worker-stats", CmdFunctionWorkerStats.class);
+        commandMap.put("functions-worker", CmdFunctionWorker.class);
         commandMap.put("source", CmdSources.class);
         commandMap.put("sink", CmdSinks.class);
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index be97986..2456a0d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
-import java.util.Optional;
 import java.util.function.Supplier;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 8b73c13..26c7127 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,12 +20,11 @@ package org.apache.pulsar.functions.worker.rest;
 
 import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStats;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 public final class Resources {
@@ -37,7 +36,7 @@ public final class Resources {
         return new HashSet<>(
                 Arrays.asList(
                         FunctionApiV2Resource.class,
-                        WorkerStats.class,
+                        WorkerApiV2Resource.class,
                         MultiPartFeature.class
                 ));
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 1c26c7d..a091472 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -31,6 +31,8 @@ import java.util.concurrent.Executors;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -141,6 +143,8 @@ public class WorkerServer {
                 new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
 
         
contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, 
workerService);
+        
contextHandler.setAttribute(WorkerApiV2Resource.ATTRIBUTE_WORKER_SERVICE, 
workerService);
+        
contextHandler.setAttribute(WorkerStatsApiV2Resource.ATTRIBUTE_WORKERSTATS_SERVICE,
 workerService);
         contextHandler.setContextPath(contextPath);
 
         final ServletHolder apiServlet =
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index d5f3f57..bbb4f8f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -27,11 +27,7 @@ import static 
org.apache.pulsar.functions.utils.functioncache.FunctionClassLoade
 
 import com.google.gson.Gson;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.*;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -39,10 +35,8 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Base64;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -71,7 +65,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
@@ -79,17 +72,10 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import 
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.MembershipManager;
 import org.apache.pulsar.functions.worker.Utils;
-import org.apache.pulsar.functions.worker.WorkerInfo;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.io.core.Sink;
@@ -583,48 +569,6 @@ public class FunctionsImpl {
         return this.worker().getConnectorsManager().getConnectors();
     }
 
-    public List<WorkerInfo> getCluster() {
-        if (!isWorkerServiceAvailable()) {
-            throw new WebApplicationException(
-                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
-        }
-        return worker().getMembershipManager().getCurrentMembership();
-    }
-
-    public WorkerInfo getClusterLeader() {
-        if (!isWorkerServiceAvailable()) {
-            throw new WebApplicationException(
-                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
-        }
-
-        MembershipManager membershipManager = worker().getMembershipManager();
-        WorkerInfo leader = membershipManager.getLeader();
-
-        if (leader == null) {
-            throw new 
WebApplicationException(Response.status(Status.INTERNAL_SERVER_ERROR)
-                    .type(MediaType.APPLICATION_JSON).entity(new 
ErrorData("Leader cannot be determined")).build());
-        }
-
-        return leader;
-    }
-
-    public Response getAssignments() {
-
-        if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
-        }
-
-        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
-        Map<String, Map<String, Function.Assignment>> assignments = 
functionRuntimeManager.getCurrentAssignments();
-        Map<String, Collection<String>> ret = new HashMap<>();
-        for (Map.Entry<String, Map<String, Function.Assignment>> entry : 
assignments.entrySet()) {
-            ret.put(entry.getKey(), entry.getValue().keySet());
-        }
-        return Response.status(Status.OK).entity(new 
Gson().toJson(ret)).build();
-    }
-
     public Response triggerFunction(final String tenant, final String 
namespace, final String functionName,
             final String input, final InputStream uploadedInputStream, final 
String topic) {
 
@@ -1069,80 +1013,4 @@ public class FunctionsImpl {
         return clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
-    public List<org.apache.pulsar.common.stats.Metrics> 
getWorkerMetrcis(String clientRole) throws IOException {
-        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
-            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
-            throw new 
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(clientRole + " is not authorize to 
get metrics")).build());
-        }
-        return getWorkerMetrcis();
-    }
-
-    private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
-        if (!isWorkerServiceAvailable()) {
-            throw new WebApplicationException(
-                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
-        }
-        return worker().getMetricsGenerator().generate();
-    }
-
-    public Response getFunctionsMetrcis(String clientRole) throws IOException {
-        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
-            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
-            return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("client is not authorize to perform 
operation")).build();
-        }
-        return getFunctionsMetrcis();
-    }
-
-    private Response getFunctionsMetrcis() throws IOException {
-        if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
-        }
-
-        WorkerService workerService = worker();
-        Map<String, FunctionRuntimeInfo> functionRuntimes = 
workerService.getFunctionRuntimeManager()
-                .getFunctionRuntimeInfos();
-
-        Metrics.Builder metricsBuilder = Metrics.newBuilder();
-        for (Map.Entry<String, FunctionRuntimeInfo> entry : 
functionRuntimes.entrySet()) {
-            String fullyQualifiedInstanceName = entry.getKey();
-            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
-            RuntimeSpawner functionRuntimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
-
-            if (functionRuntimeSpawner != null) {
-                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
-                if (functionRuntime != null) {
-                    try {
-                        InstanceCommunication.MetricsData metricsData = 
workerService.getWorkerConfig()
-                                .getMetricsSamplingPeriodSec() > 0 ? 
functionRuntime.getMetrics().get()
-                                        : 
functionRuntime.getAndResetMetrics().get();
-
-                        String tenant = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                                .getFunctionDetails().getTenant();
-                        String namespace = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                                .getFunctionDetails().getNamespace();
-                        String name = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                                .getFunctionDetails().getName();
-                        int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
-                        String qualifiedFunctionName = 
String.format("%s/%s/%s", tenant, namespace, name);
-
-                        InstanceMetrics.Builder instanceBuilder = 
InstanceMetrics.newBuilder();
-                        instanceBuilder.setName(qualifiedFunctionName);
-                        instanceBuilder.setInstanceId(instanceId);
-                        if (metricsData != null) {
-                            instanceBuilder.setMetricsData(metricsData);
-                        }
-                        metricsBuilder.addMetrics(instanceBuilder.build());
-                    } catch (InterruptedException | ExecutionException e) {
-                        log.warn("Failed to collect metrics for function 
instance {}", fullyQualifiedInstanceName, e);
-                    }
-                }
-            }
-        }
-        String jsonResponse = 
org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
-        return Response.status(Status.OK).entity(jsonResponse).build();
-    }
-
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
new file mode 100644
index 0000000..87e9bc2
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.worker.*;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@Slf4j
+public class WorkerImpl {
+
+    private final Supplier<WorkerService> workerServiceSupplier;
+
+    public WorkerImpl(Supplier<WorkerService> workerServiceSupplier) {
+        this.workerServiceSupplier = workerServiceSupplier;
+    }
+
+    private WorkerService worker() {
+        try {
+            return checkNotNull(workerServiceSupplier.get());
+        } catch (Throwable t) {
+            log.info("Failed to get worker service", t);
+            throw t;
+        }
+    }
+
+    private boolean isWorkerServiceAvailable() {
+        WorkerService workerService = workerServiceSupplier.get();
+        if (workerService == null) {
+            return false;
+        }
+        if (!workerService.isInitialized()) {
+            return false;
+        }
+        return true;
+    }
+
+    public Response getCluster() {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+        List<WorkerInfo> workers = 
worker().getMembershipManager().getCurrentMembership();
+        String jsonString = new Gson().toJson(workers);
+        return 
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+    }
+
+    public Response getClusterLeader() {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        MembershipManager membershipManager = worker().getMembershipManager();
+        WorkerInfo leader = membershipManager.getLeader();
+
+        if (leader == null) {
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Leader cannot be 
determined")).build();
+        }
+
+        String jsonString = new Gson().toJson(leader);
+        return 
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+    }
+
+    public Response getAssignments() {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
+        Map<String, Map<String, Function.Assignment>> assignments = 
functionRuntimeManager.getCurrentAssignments();
+        Map<String, Collection<String>> ret = new HashMap<>();
+        for (Map.Entry<String, Map<String, Function.Assignment>> entry : 
assignments.entrySet()) {
+            ret.put(entry.getKey(), entry.getValue().keySet());
+        }
+        return 
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(new 
Gson().toJson(ret)).build();
+    }
+
+    private Response getUnavailableResponse() {
+        return 
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+                .entity(new ErrorData(
+                        "Function worker service is not done initializing. " + 
"Please try again in a little while."))
+                .build();
+    }
+
+    public boolean isSuperUser(String clientRole) {
+        return clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
+    }
+
+    public List<org.apache.pulsar.common.stats.Metrics> 
getWorkerMetrcis(String clientRole) throws IOException {
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
+            throw new 
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(clientRole + " is not authorize to 
get metrics")).build());
+        }
+        return getWorkerMetrcis();
+    }
+
+    private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
+        if (!isWorkerServiceAvailable()) {
+            throw new WebApplicationException(
+                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
+        }
+        return worker().getMetricsGenerator().generate();
+    }
+
+    public Response getFunctionsMetrics(String clientRole) throws IOException {
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
+            return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("client is not authorize to perform 
operation")).build();
+        }
+        return getFunctionsMetrics();
+    }
+
+    private Response getFunctionsMetrics() throws IOException {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        WorkerService workerService = worker();
+        Map<String, FunctionRuntimeInfo> functionRuntimes = 
workerService.getFunctionRuntimeManager()
+                .getFunctionRuntimeInfos();
+
+        Metrics.Builder metricsBuilder = Metrics.newBuilder();
+        for (Map.Entry<String, FunctionRuntimeInfo> entry : 
functionRuntimes.entrySet()) {
+            String fullyQualifiedInstanceName = entry.getKey();
+            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+            RuntimeSpawner functionRuntimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
+
+            if (functionRuntimeSpawner != null) {
+                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+                if (functionRuntime != null) {
+                    try {
+                        InstanceCommunication.MetricsData metricsData = 
workerService.getWorkerConfig()
+                                .getMetricsSamplingPeriodSec() > 0 ? 
functionRuntime.getMetrics().get()
+                                : functionRuntime.getAndResetMetrics().get();
+
+                        String tenant = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getTenant();
+                        String namespace = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getNamespace();
+                        String name = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getName();
+                        int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                        String qualifiedFunctionName = 
String.format("%s/%s/%s", tenant, namespace, name);
+
+                        InstanceMetrics.Builder instanceBuilder = 
InstanceMetrics.newBuilder();
+                        instanceBuilder.setName(qualifiedFunctionName);
+                        instanceBuilder.setInstanceId(instanceId);
+                        if (metricsData != null) {
+                            instanceBuilder.setMetricsData(metricsData);
+                        }
+                        metricsBuilder.addMetrics(instanceBuilder.build());
+                    } catch (InterruptedException | ExecutionException e) {
+                        log.warn("Failed to collect metrics for function 
instance {}", fullyQualifiedInstanceName, e);
+                    }
+                }
+            }
+        }
+        String jsonResponse = 
org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
+        return Response.status(Status.OK).entity(jsonResponse).build();
+    }
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index e13f69c..95fe687 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -35,17 +35,13 @@ import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-import org.apache.pulsar.functions.worker.WorkerInfo;
-
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 @Path("/functions")
@@ -129,27 +125,6 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
 
     }
 
-    @GET
-    @Path("/cluster")
-    @Produces(MediaType.APPLICATION_JSON)
-    @ApiOperation(value = "Fetches information about the Pulsar cluster 
running Pulsar Functions")
-    public List<WorkerInfo> getCluster() {
-        return functions.getCluster();
-    }
-
-    @GET
-    @Path("/cluster/leader")
-    @Produces(MediaType.APPLICATION_JSON)
-    public WorkerInfo getClusterLeader() {
-        return functions.getClusterLeader();
-    }
-
-    @GET
-    @Path("/assignments")
-    public Response getAssignments() {
-        return functions.getAssignments();
-    }
-
     @POST
     @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
new file mode 100644
index 0000000..8cf0797
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import java.util.function.Supplier;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.worker.WorkerService;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+
+@Slf4j
+@Path("/worker")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Api(value = "/worker", description = "Workers admin api", tags = "workers")
+public class WorkerApiV2Resource implements Supplier<WorkerService> {
+
+    public static final String ATTRIBUTE_WORKER_SERVICE = "worker";
+
+    protected final WorkerImpl worker;
+    private WorkerService workerService;
+    @Context
+    protected ServletContext servletContext;
+    @Context
+    protected HttpServletRequest httpRequest;
+
+    public WorkerApiV2Resource() {
+        this.worker = new WorkerImpl(this);
+    }
+
+    @Override
+    public synchronized WorkerService get() {
+        if (this.workerService == null) {
+            this.workerService = (WorkerService) 
servletContext.getAttribute(ATTRIBUTE_WORKER_SERVICE);
+        }
+        return this.workerService;
+    }
+
+    public String clientAppId() {
+        return httpRequest != null
+                ? (String) 
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+                : null;
+    }
+
+    @GET
+    @Path("/cluster")
+    @ApiOperation(value = "Fetches information about the Pulsar cluster 
running Pulsar Functions")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "WorkerApiV2Resource service is 
not running") })
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getCluster() {
+        return worker.getCluster();
+    }
+
+    @GET
+    @Path("/cluster/leader")
+    @ApiOperation(value = "Fetches info about the leader node of the Pulsar 
cluster running Pulsar Functions")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "WorkerApiV2Resource service is 
not running") })
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getClusterLeader() {
+        return worker.getClusterLeader();
+    }
+
+    @GET
+    @Path("/assignments")
+    @ApiOperation(value = "Fetches information about which Pulsar Functions 
are assigned to which Pulsar clusters",
+            response = Function.Assignment.class,
+            responseContainer = "Map")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "WorkerApiV2Resource service is 
not running") })
+    public Response getAssignments() {
+        return worker.getAssignments();
+    }
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
similarity index 53%
rename from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
rename to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
index 146bb21..c802da7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
@@ -18,46 +18,77 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
-import java.io.IOException;
-import java.util.Collection;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
 
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Supplier;
 
 @Slf4j
 @Path("/worker-stats")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-@Api(value = "/worker-stats", description = "Workers admin api", tags = 
"workers")
-public class WorkerStats extends FunctionApiResource {
+@Api(value = "/worker-stats", description = "Workers stats api", tags = 
"workers-stats")
+public class WorkerStatsApiV2Resource implements Supplier<WorkerService> {
 
-    @GET
-    @Path("/functions")
-    @ApiOperation(value = "Get stats for all functions owned by worker", notes 
= "Request should be executed by Monitoring agent on each worker to fetch the 
function-metrics", response = Metrics.class)
-    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getStats() throws IOException {
-        return functions.getFunctionsMetrcis(clientAppId());
+    public static final String ATTRIBUTE_WORKERSTATS_SERVICE = "worker-stats";
+
+    protected final WorkerImpl worker;
+    private WorkerService workerService;
+    @Context
+    protected ServletContext servletContext;
+    @Context
+    protected HttpServletRequest httpRequest;
+
+    public WorkerStatsApiV2Resource() {
+        this.worker = new WorkerImpl(this);
+    }
+
+    @Override
+    public synchronized WorkerService get() {
+        if (this.workerService == null) {
+            this.workerService = (WorkerService) 
servletContext.getAttribute(ATTRIBUTE_WORKERSTATS_SERVICE);
+        }
+        return this.workerService;
     }
-    
+
+    public String clientAppId() {
+        return httpRequest != null
+                ? (String) 
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+                : null;
+    }
+
     @GET
     @Path("/metrics")
     @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
     public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
-        return functions.getWorkerMetrcis(clientAppId());
+        return worker.getWorkerMetrcis(clientAppId());
+    }
+
+    @GET
+    @Path("/functionsmetrics")
+    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not 
running") })
+    public Response getFunctionsMetrics() throws IOException {
+        return worker.getFunctionsMetrics(clientAppId());
     }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
similarity index 98%
rename from 
pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java
rename to 
pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
index b830034..4c6d249 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
@@ -26,7 +26,7 @@ import org.testng.annotations.Test;
 /**
  * Unit test of {@link WorkerConfig}.
  */
-public class WorkerConfigTest {
+public class WorkerApiV2ResourceConfigTest {
 
     private static final String TEST_NAME = "test-worker-config";
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
index dcc0999..947acb7 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
@@ -33,6 +33,6 @@ public class WorkerContainer extends 
PulsarContainer<WorkerContainer> {
             "bin/run-functions-worker.sh",
             -1,
             BROKER_HTTP_PORT,
-            "/admin/v2/functions/cluster");
+            "/admin/v2/worker/cluster");
     }
 }

Reply via email to