This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 23a6622  Cleaning up and improving worker endpoints (#3191)
23a6622 is described below

commit 23a6622dd57aac1a02cedb26bdfcc8ef19564886
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Thu Dec 13 21:17:34 2018 -0800

    Cleaning up and improving worker endpoints (#3191)
---
 .../org/apache/pulsar/broker/admin/v2/Worker.java  | 53 ++++++++-------
 .../apache/pulsar/broker/admin/v2/WorkerStats.java | 33 +++++++---
 .../org/apache/pulsar/client/admin/Worker.java     |  6 +-
 .../pulsar/client/admin/internal/WorkerImpl.java   | 76 +++++++++++-----------
 .../org/apache/pulsar/admin/cli/CliCommand.java    |  8 +++
 .../apache/pulsar/admin/cli/CmdFunctionWorker.java | 69 +++-----------------
 .../policies/data/WorkerFunctionInstanceStats.java | 29 +++++++++
 .../functions/worker/rest/api/WorkerImpl.java      | 65 +++++++-----------
 .../worker/rest/api/v2/WorkerApiV2Resource.java    | 69 ++++++++++++--------
 .../rest/api/v2/WorkerStatsApiV2Resource.java      | 34 +++++++---
 10 files changed, 228 insertions(+), 214 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
index 46427cb..a40132d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -18,24 +18,24 @@
  */
 package org.apache.pulsar.broker.admin.v2;
 
-import java.util.function.Supplier;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.worker.WorkerService;
-
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
 
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
 @Slf4j
 @Path("/worker")
 public class Worker extends AdminResource implements Supplier<WorkerService> {
@@ -53,42 +53,47 @@ public class Worker extends AdminResource implements 
Supplier<WorkerService> {
 
     @GET
     @ApiOperation(
-            value = "Fetches information about the Pulsar cluster running 
Pulsar Functions"
+            value = "Fetches information about the Pulsar cluster running 
Pulsar Functions",
+            response = WorkerInfo.class,
+            responseContainer = "List"
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
-
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
     })
     @Path("/cluster")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response getCluster() {
+    public List<WorkerInfo> getCluster() {
         return worker.getCluster();
     }
 
     @GET
     @ApiOperation(
-            value = "Fetches info about the leader node of the Pulsar cluster 
running Pulsar Functions")
+            value = "Fetches info about the leader node of the Pulsar cluster 
running Pulsar Functions",
+            response = WorkerInfo.class
+    )
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
-
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
     })
     @Path("/cluster/leader")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response getClusterLeader() {
+    public WorkerInfo getClusterLeader() {
         return worker.getClusterLeader();
     }
 
     @GET
     @ApiOperation(
             value = "Fetches information about which Pulsar Functions are 
assigned to which Pulsar clusters",
-            response = Function.Assignment.class,
-            responseContainer = "Map"
+            response = Map.class
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
     })
     @Path("/assignments")
-    public Response getAssignments() {
+    @Produces(MediaType.APPLICATION_JSON)
+    public Map<String, Collection<String>> getAssignments() {
         return worker.getAssignments();
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index a0ec3fe..a5146df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -23,15 +23,17 @@ import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import java.util.function.Supplier;
 
 @Slf4j
@@ -51,18 +53,33 @@ public class WorkerStats extends AdminResource implements 
Supplier<WorkerService
 
     @GET
     @Path("/metrics")
-    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
-    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
+    @ApiOperation(
+            value = "Gets the metrics for Monitoring",
+            notes = "Request should be executed by Monitoring agent on each 
worker to fetch the worker-metrics",
+            response = org.apache.pulsar.common.stats.Metrics.class,
+            responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Don't have admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
     public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
         return worker.getWorkerMetrics(clientAppId());
     }
 
     @GET
     @Path("/functionsmetrics")
-    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getStats() throws IOException {
+    @ApiOperation(
+            value = "Get metrics for all functions owned by worker",
+            notes = "Requested should be executed by Monitoring agent on each 
worker to fetch the metrics",
+            response = WorkerFunctionInstanceStats.class,
+            responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Don't have admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    public List<WorkerFunctionInstanceStats> getStats() throws IOException {
         return worker.getFunctionsMetrics(clientAppId());
     }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
index 37e1bac..b21f326 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
@@ -22,8 +22,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
 
 /**
  * Admin interface for worker stats management.
@@ -36,8 +36,8 @@ public interface Worker {
      * @return
      * @throws PulsarAdminException 
      */
-    Metrics getFunctionsStats() throws PulsarAdminException;
-    
+    List<WorkerFunctionInstanceStats> getFunctionsStats() throws 
PulsarAdminException;
+
     /**
      * Get worker metrics.
      * @return
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index c2a9d13..3df818a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -18,27 +18,20 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
-import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
-
-import java.lang.reflect.Type;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Worker;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
 
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.Worker;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.functions.WorkerInfo;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
 @Slf4j
 public class WorkerImpl extends BaseResource implements Worker {
@@ -53,27 +46,28 @@ public class WorkerImpl extends BaseResource implements 
Worker {
     }
 
     @Override
-    public Metrics getFunctionsStats() throws PulsarAdminException {
+    public List<WorkerFunctionInstanceStats> getFunctionsStats() throws 
PulsarAdminException {
         try {
             Response response = 
request(workerStats.path("functionsmetrics")).get();
-           if (!response.getStatusInfo().equals(Response.Status.OK)) {
-               throw new ClientErrorException(response);
-           }
-           String jsonResponse = response.readEntity(String.class);
-           Metrics.Builder metricsBuilder = Metrics.newBuilder();
-           mergeJson(jsonResponse, metricsBuilder);
-           return metricsBuilder.build();
-       } catch (Exception e) {
-           throw getApiException(e);
-       }
-   }
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            List<WorkerFunctionInstanceStats> metricsList
+                    = response.readEntity(new 
GenericType<List<WorkerFunctionInstanceStats>>() {});
+            return metricsList;
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 
     @Override
     public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws PulsarAdminException {
         try {
-            return request(workerStats.path("metrics"))
-                    .get(new 
GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {
-                    });
+            Response response = request(workerStats.path("metrics")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new 
GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {});
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -82,9 +76,11 @@ public class WorkerImpl extends BaseResource implements 
Worker {
     @Override
     public List<WorkerInfo> getCluster() throws PulsarAdminException {
         try {
-            return request(worker.path("cluster"))
-                    .get(new GenericType<List<WorkerInfo>>() {
-                    });
+            Response response = request(worker.path("cluster")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new GenericType<List<WorkerInfo>>() {});
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -93,8 +89,11 @@ public class WorkerImpl extends BaseResource implements 
Worker {
     @Override
     public WorkerInfo getClusterLeader() throws PulsarAdminException {
         try {
-            return request(worker.path("cluster").path("leader"))
-                    .get(new GenericType<WorkerInfo>(){});
+            Response response = 
request(worker.path("cluster").path("leader")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new GenericType<WorkerInfo>(){});
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -107,9 +106,8 @@ public class WorkerImpl extends BaseResource implements 
Worker {
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
                 throw new ClientErrorException(response);
             }
-            String jsonResponse = response.readEntity(String.class);
-            Type type = new TypeToken<Map<String, 
Collection<String>>>(){}.getType();
-            Map<String, Collection<String>> assignments = new 
Gson().fromJson(jsonResponse, type);
+            Map<String, Collection<String>> assignments
+                    = response.readEntity(new GenericType<Map<String, 
Collection<String>>>() {});
             return assignments;
         } catch (Exception e) {
             throw getApiException(e);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index 99d27d8..7a3d150 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -175,6 +175,14 @@ abstract class CliCommand {
         }
     }
 
+    <T> void printList(T item) {
+        try {
+            System.out.println(writer.writeValueAsString(item));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     <T> void print(T item) {
         try {
             System.out.println(writer.writeValueAsString(item));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
index 324a028..6a166c7 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
@@ -18,23 +18,11 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import com.google.protobuf.util.JsonFormat;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParser;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.functions.WorkerInfo;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClientException;
 
 @Slf4j
 @Parameters(commandDescription = "Operations to collect function-worker 
statistics")
@@ -57,88 +45,49 @@ public class CmdFunctionWorker extends CmdBase {
         abstract void runCmd() throws Exception;
     }
 
-    @Parameters(commandDescription = "dump all functions stats")
+    @Parameters(commandDescription = "Dump all functions stats running on this 
broker")
     class FunctionsStats extends BaseCommand {
 
-        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
-        boolean indent = false;
-
         @Override
         void runCmd() throws Exception {
-            String json = 
JsonFormat.printer().print(admin.worker().getFunctionsStats());
-            GsonBuilder gsonBuilder = new GsonBuilder();
-            if (indent) {
-                gsonBuilder.setPrettyPrinting();
-            }
-            System.out.println(gsonBuilder.create().toJson(new 
JsonParser().parse(json)));
+            printList(admin.worker().getFunctionsStats());
         }
     }
 
-    @Parameters(commandDescription = "dump metrics for Monitoring")
+    @Parameters(commandDescription = "Dump metrics for Monitoring")
     class CmdMonitoringMetrics extends BaseCommand {
 
-        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
-        boolean indent = false;
-
         @Override
         void runCmd() throws Exception {
-            String json = new Gson().toJson(admin.worker().getMetrics());
-            GsonBuilder gsonBuilder = new GsonBuilder();
-            if (indent) {
-                gsonBuilder.setPrettyPrinting();
-            }
-            System.out.println(gsonBuilder.create().toJson(new 
JsonParser().parse(json)));
+            printList(admin.worker().getMetrics());
         }
     }
 
     @Parameters(commandDescription = "Get all workers belonging to this 
cluster")
     class GetCluster extends BaseCommand {
 
-        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
-        boolean indent = false;
-
         @Override
         void runCmd() throws Exception {
-            List<WorkerInfo> workers = admin.worker().getCluster();
-            GsonBuilder gsonBuilder = new GsonBuilder();
-            if (indent) {
-                gsonBuilder.setPrettyPrinting();
-            }
-            System.out.println(gsonBuilder.create().toJson(workers));
+            printList(admin.worker().getCluster());
         }
     }
 
     @Parameters(commandDescription = "Get the leader of the worker cluster")
     class GetClusterLeader extends BaseCommand {
 
-        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
-        boolean indent = false;
-
         @Override
         void runCmd() throws Exception {
-            WorkerInfo leader = admin.worker().getClusterLeader();
-            GsonBuilder gsonBuilder = new GsonBuilder();
-            if (indent) {
-                gsonBuilder.setPrettyPrinting();
-            }
-            System.out.println(gsonBuilder.create().toJson(leader));
+            print(admin.worker().getClusterLeader());
         }
     }
 
     @Parameters(commandDescription = "Get the assignments of the functions 
accross the worker cluster")
     class GetFunctionAssignments extends BaseCommand {
 
-        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
-        boolean indent = false;
 
         @Override
         void runCmd() throws Exception {
-            Map<String, Collection<String>> assignments = 
admin.worker().getAssignments();
-            GsonBuilder gsonBuilder = new GsonBuilder();
-            if (indent) {
-                gsonBuilder.setPrettyPrinting();
-            }
-            System.out.println(gsonBuilder.create().toJson(assignments));
+            print(admin.worker().getAssignments());
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java
new file mode 100644
index 0000000..86079bb
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.Data;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+
+@Data
+public class WorkerFunctionInstanceStats {
+    /** fully qualified function instance name **/
+    public String name;
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
metrics;
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 56c945d..6351272 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -18,25 +18,25 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.MembershipManager;
 import org.apache.pulsar.functions.worker.Utils;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.RestException;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -50,8 +50,6 @@ public class WorkerImpl {
 
     private final Supplier<WorkerService> workerServiceSupplier;
 
-    private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
-
     public WorkerImpl(Supplier<WorkerService> workerServiceSupplier) {
         this.workerServiceSupplier = workerServiceSupplier;
     }
@@ -76,36 +74,33 @@ public class WorkerImpl {
         return true;
     }
 
-    public Response getCluster() {
+    public List<WorkerInfo> getCluster() {
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
         }
         List<WorkerInfo> workers = 
worker().getMembershipManager().getCurrentMembership();
-        String jsonString = new Gson().toJson(workers);
-        return 
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+        return workers;
     }
 
-    public Response getClusterLeader() {
+    public WorkerInfo getClusterLeader() {
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
         }
 
         MembershipManager membershipManager = worker().getMembershipManager();
         WorkerInfo leader = membershipManager.getLeader();
 
         if (leader == null) {
-            return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("Leader cannot be 
determined")).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, "Leader 
cannot be determined");
         }
 
-        String jsonString = new Gson().toJson(leader);
-        return 
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+        return leader;
     }
 
-    public Response getAssignments() {
+    public Map<String, Collection<String>> getAssignments() {
 
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
         }
 
         FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
@@ -114,14 +109,7 @@ public class WorkerImpl {
         for (Map.Entry<String, Map<String, Function.Assignment>> entry : 
assignments.entrySet()) {
             ret.put(entry.getKey(), entry.getValue().keySet());
         }
-        return 
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(new 
Gson().toJson(ret)).build();
-    }
-
-    private Response getUnavailableResponse() {
-        return 
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                .entity(new ErrorData(
-                        "Function worker service is not done initializing. " + 
"Please try again in a little while."))
-                .build();
+        return ret;
     }
 
     public boolean isSuperUser(final String clientRole) {
@@ -146,32 +134,25 @@ public class WorkerImpl {
         return worker().getMetricsGenerator().generate();
     }
 
-    public Response getFunctionsMetrics(final String clientRole) {
+    public List<WorkerFunctionInstanceStats> getFunctionsMetrics(String 
clientRole) throws IOException {
+
         if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
             log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
-            return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("client is not authorize to perform 
operation")).build();
+            throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
         }
         return getFunctionsMetrics();
     }
 
-    @Data
-    public static class WorkerFunctionInstanceStats {
-        /** fully qualified function instance name **/
-        public String name;
-        public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
metrics;
-    }
-
-    private Response getFunctionsMetrics() {
+    private List<WorkerFunctionInstanceStats> getFunctionsMetrics() throws 
IOException {
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function 
worker service is not done initializing. Please try again in a little while.");
         }
 
         WorkerService workerService = worker();
         Map<String, FunctionRuntimeInfo> functionRuntimes = 
workerService.getFunctionRuntimeManager()
                 .getFunctionRuntimeInfos();
 
-        JsonArray metricsMapList = new JsonArray();
+        List<WorkerFunctionInstanceStats> metricsList = new 
ArrayList<>(functionRuntimes.size());
 
         for (Map.Entry<String, FunctionRuntimeInfo> entry : 
functionRuntimes.entrySet()) {
             String fullyQualifiedInstanceName = entry.getKey();
@@ -184,10 +165,8 @@ public class WorkerImpl {
             workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
             
workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
 
-            metricsMapList.add(gson.toJsonTree(workerFunctionInstanceStats));
+            metricsList.add(workerFunctionInstanceStats);
         }
-        String jsonResponse = gson.toJson(metricsMapList);
-
-        return Response.status(Status.OK).entity(jsonResponse).build();
+        return metricsList;
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
index 8cf0797..29d3ff0 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
@@ -18,7 +18,15 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
-import java.util.function.Supplier;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
 
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
@@ -28,18 +36,10 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.broker.web.AuthenticationFilter;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.worker.WorkerService;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
 
 @Slf4j
 @Path("/worker")
@@ -76,33 +76,48 @@ public class WorkerApiV2Resource implements 
Supplier<WorkerService> {
     }
 
     @GET
+    @ApiOperation(
+            value = "Fetches information about the Pulsar cluster running 
Pulsar Functions",
+            response = WorkerInfo.class,
+            responseContainer = "List"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
+    })
     @Path("/cluster")
-    @ApiOperation(value = "Fetches information about the Pulsar cluster 
running Pulsar Functions")
-    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 503, message = "WorkerApiV2Resource service is 
not running") })
     @Produces(MediaType.APPLICATION_JSON)
-    public Response getCluster() {
+    public List<WorkerInfo> getCluster() {
         return worker.getCluster();
     }
 
     @GET
+    @ApiOperation(
+            value = "Fetches info about the leader node of the Pulsar cluster 
running Pulsar Functions",
+            response = WorkerInfo.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
+    })
     @Path("/cluster/leader")
-    @ApiOperation(value = "Fetches info about the leader node of the Pulsar 
cluster running Pulsar Functions")
-    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 503, message = "WorkerApiV2Resource service is 
not running") })
     @Produces(MediaType.APPLICATION_JSON)
-    public Response getClusterLeader() {
+    public WorkerInfo getClusterLeader() {
         return worker.getClusterLeader();
     }
 
     @GET
+    @ApiOperation(
+            value = "Fetches information about which Pulsar Functions are 
assigned to which Pulsar clusters",
+            response = Map.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
+    })
     @Path("/assignments")
-    @ApiOperation(value = "Fetches information about which Pulsar Functions 
are assigned to which Pulsar clusters",
-            response = Function.Assignment.class,
-            responseContainer = "Map")
-    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 503, message = "WorkerApiV2Resource service is 
not running") })
-    public Response getAssignments() {
+    @Produces(MediaType.APPLICATION_JSON)
+    public Map<String, Collection<String>> getAssignments() {
         return worker.getAssignments();
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
index 022ebd3..bd404d6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
@@ -24,7 +24,7 @@ import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
 
@@ -36,9 +36,8 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import java.io.IOException;
-import java.util.Collection;
+import java.util.List;
 import java.util.function.Supplier;
 
 @Slf4j
@@ -77,18 +76,33 @@ public class WorkerStatsApiV2Resource implements 
Supplier<WorkerService> {
 
     @GET
     @Path("/metrics")
-    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
-    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
-    public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
+    @ApiOperation(
+            value = "Gets the metrics for Monitoring",
+            notes = "Request should be executed by Monitoring agent on each 
worker to fetch the worker-metrics",
+            response = org.apache.pulsar.common.stats.Metrics.class,
+            responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Don't have admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    public List<org.apache.pulsar.common.stats.Metrics> getMetrics() throws 
Exception {
         return worker.getWorkerMetrics(clientAppId());
     }
 
     @GET
     @Path("/functionsmetrics")
-    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = WorkerImpl.WorkerFunctionInstanceStats.class, 
responseContainer = "List")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getFunctionsMetrics() throws IOException {
+    @ApiOperation(
+            value = "Get metrics for all functions owned by worker",
+            notes = "Requested should be executed by Monitoring agent on each 
worker to fetch the metrics",
+            response = WorkerFunctionInstanceStats.class,
+            responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Don't have admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not running")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
+    public List<WorkerFunctionInstanceStats> getStats() throws IOException {
         return worker.getFunctionsMetrics(clientAppId());
     }
 }

Reply via email to