This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 23a6622 Cleaning up and improving worker endpoints (#3191) 23a6622 is described below commit 23a6622dd57aac1a02cedb26bdfcc8ef19564886 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu Dec 13 21:17:34 2018 -0800 Cleaning up and improving worker endpoints (#3191) --- .../org/apache/pulsar/broker/admin/v2/Worker.java | 53 ++++++++------- .../apache/pulsar/broker/admin/v2/WorkerStats.java | 33 +++++++--- .../org/apache/pulsar/client/admin/Worker.java | 6 +- .../pulsar/client/admin/internal/WorkerImpl.java | 76 +++++++++++----------- .../org/apache/pulsar/admin/cli/CliCommand.java | 8 +++ .../apache/pulsar/admin/cli/CmdFunctionWorker.java | 69 +++----------------- .../policies/data/WorkerFunctionInstanceStats.java | 29 +++++++++ .../functions/worker/rest/api/WorkerImpl.java | 65 +++++++----------- .../worker/rest/api/v2/WorkerApiV2Resource.java | 69 ++++++++++++-------- .../rest/api/v2/WorkerStatsApiV2Resource.java | 34 +++++++--- 10 files changed, 228 insertions(+), 214 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java index 46427cb..a40132d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java @@ -18,24 +18,24 @@ */ package org.apache.pulsar.broker.admin.v2; -import java.util.function.Supplier; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - -import org.apache.pulsar.broker.admin.AdminResource; -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.worker.WorkerService; - import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.common.functions.WorkerInfo; +import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + @Slf4j @Path("/worker") public class Worker extends AdminResource implements Supplier<WorkerService> { @@ -53,42 +53,47 @@ public class Worker extends AdminResource implements Supplier<WorkerService> { @GET @ApiOperation( - value = "Fetches information about the Pulsar cluster running Pulsar Functions" + value = "Fetches information about the Pulsar cluster running Pulsar Functions", + response = WorkerInfo.class, + responseContainer = "List" ) @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") - + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 503, message = "Worker service is not running") }) @Path("/cluster") @Produces(MediaType.APPLICATION_JSON) - public Response getCluster() { + public List<WorkerInfo> getCluster() { return worker.getCluster(); } @GET @ApiOperation( - value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions") + value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions", + response = WorkerInfo.class + ) @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") - + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 503, message = "Worker service is not running") }) @Path("/cluster/leader") @Produces(MediaType.APPLICATION_JSON) - public Response getClusterLeader() { + public WorkerInfo getClusterLeader() { return worker.getClusterLeader(); } @GET @ApiOperation( value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters", - response = Function.Assignment.class, - responseContainer = "Map" + response = Map.class ) @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 503, message = "Worker service is not running") }) @Path("/assignments") - public Response getAssignments() { + @Produces(MediaType.APPLICATION_JSON) + public Map<String, Collection<String>> getAssignments() { return worker.getAssignments(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java index a0ec3fe..a5146df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java @@ -23,15 +23,17 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.admin.AdminResource; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; import javax.ws.rs.GET; import javax.ws.rs.Path; -import javax.ws.rs.core.Response; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.function.Supplier; @Slf4j @@ -51,18 +53,33 @@ public class WorkerStats extends AdminResource implements Supplier<WorkerService @GET @Path("/metrics") - @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") - @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) + @ApiOperation( + value = "Gets the metrics for Monitoring", + notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", + response = org.apache.pulsar.common.stats.Metrics.class, + responseContainer = "List") + @ApiResponses(value = { + @ApiResponse(code = 401, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "Worker service is not running") + }) + @Produces(MediaType.APPLICATION_JSON) public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception { return worker.getWorkerMetrics(clientAppId()); } @GET @Path("/functionsmetrics") - @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 503, message = "Worker service is not running") }) - public Response getStats() throws IOException { + @ApiOperation( + value = "Get metrics for all functions owned by worker", + notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", + response = WorkerFunctionInstanceStats.class, + responseContainer = "List") + @ApiResponses(value = { + @ApiResponse(code = 401, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "Worker service is not running") + }) + @Produces(MediaType.APPLICATION_JSON) + public List<WorkerFunctionInstanceStats> getStats() throws IOException { return worker.getFunctionsMetrics(clientAppId()); } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java index 37e1bac..b21f326 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java @@ -22,8 +22,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; import org.apache.pulsar.common.functions.WorkerInfo; +import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats; /** * Admin interface for worker stats management. @@ -36,8 +36,8 @@ public interface Worker { * @return * @throws PulsarAdminException */ - Metrics getFunctionsStats() throws PulsarAdminException; - + List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException; + /** * Get worker metrics. * @return diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java index c2a9d13..3df818a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java @@ -18,27 +18,20 @@ */ package org.apache.pulsar.client.admin.internal; -import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson; - -import java.lang.reflect.Type; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Worker; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.common.functions.WorkerInfo; +import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats; import javax.ws.rs.ClientErrorException; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.Response; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.Worker; -import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; - -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.common.functions.WorkerInfo; +import java.util.Collection; +import java.util.List; +import java.util.Map; @Slf4j public class WorkerImpl extends BaseResource implements Worker { @@ -53,27 +46,28 @@ public class WorkerImpl extends BaseResource implements Worker { } @Override - public Metrics getFunctionsStats() throws PulsarAdminException { + public List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException { try { Response response = request(workerStats.path("functionsmetrics")).get(); - if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); - } - String jsonResponse = response.readEntity(String.class); - Metrics.Builder metricsBuilder = Metrics.newBuilder(); - mergeJson(jsonResponse, metricsBuilder); - return metricsBuilder.build(); - } catch (Exception e) { - throw getApiException(e); - } - } + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + List<WorkerFunctionInstanceStats> metricsList + = response.readEntity(new GenericType<List<WorkerFunctionInstanceStats>>() {}); + return metricsList; + } catch (Exception e) { + throw getApiException(e); + } + } @Override public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException { try { - return request(workerStats.path("metrics")) - .get(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() { - }); + Response response = request(workerStats.path("metrics")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {}); } catch (Exception e) { throw getApiException(e); } @@ -82,9 +76,11 @@ public class WorkerImpl extends BaseResource implements Worker { @Override public List<WorkerInfo> getCluster() throws PulsarAdminException { try { - return request(worker.path("cluster")) - .get(new GenericType<List<WorkerInfo>>() { - }); + Response response = request(worker.path("cluster")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(new GenericType<List<WorkerInfo>>() {}); } catch (Exception e) { throw getApiException(e); } @@ -93,8 +89,11 @@ public class WorkerImpl extends BaseResource implements Worker { @Override public WorkerInfo getClusterLeader() throws PulsarAdminException { try { - return request(worker.path("cluster").path("leader")) - .get(new GenericType<WorkerInfo>(){}); + Response response = request(worker.path("cluster").path("leader")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(new GenericType<WorkerInfo>(){}); } catch (Exception e) { throw getApiException(e); } @@ -107,9 +106,8 @@ public class WorkerImpl extends BaseResource implements Worker { if (!response.getStatusInfo().equals(Response.Status.OK)) { throw new ClientErrorException(response); } - String jsonResponse = response.readEntity(String.class); - Type type = new TypeToken<Map<String, Collection<String>>>(){}.getType(); - Map<String, Collection<String>> assignments = new Gson().fromJson(jsonResponse, type); + Map<String, Collection<String>> assignments + = response.readEntity(new GenericType<Map<String, Collection<String>>>() {}); return assignments; } catch (Exception e) { throw getApiException(e); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java index 99d27d8..7a3d150 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java @@ -175,6 +175,14 @@ abstract class CliCommand { } } + <T> void printList(T item) { + try { + System.out.println(writer.writeValueAsString(item)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + <T> void print(T item) { try { System.out.println(writer.writeValueAsString(item)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java index 324a028..6a166c7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java @@ -18,23 +18,11 @@ */ package org.apache.pulsar.admin.cli; -import com.google.protobuf.util.JsonFormat; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.PulsarClientException; - -import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonParser; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.common.functions.WorkerInfo; - -import java.util.Collection; -import java.util.List; -import java.util.Map; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClientException; @Slf4j @Parameters(commandDescription = "Operations to collect function-worker statistics") @@ -57,88 +45,49 @@ public class CmdFunctionWorker extends CmdBase { abstract void runCmd() throws Exception; } - @Parameters(commandDescription = "dump all functions stats") + @Parameters(commandDescription = "Dump all functions stats running on this broker") class FunctionsStats extends BaseCommand { - @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) - boolean indent = false; - @Override void runCmd() throws Exception { - String json = JsonFormat.printer().print(admin.worker().getFunctionsStats()); - GsonBuilder gsonBuilder = new GsonBuilder(); - if (indent) { - gsonBuilder.setPrettyPrinting(); - } - System.out.println(gsonBuilder.create().toJson(new JsonParser().parse(json))); + printList(admin.worker().getFunctionsStats()); } } - @Parameters(commandDescription = "dump metrics for Monitoring") + @Parameters(commandDescription = "Dump metrics for Monitoring") class CmdMonitoringMetrics extends BaseCommand { - @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) - boolean indent = false; - @Override void runCmd() throws Exception { - String json = new Gson().toJson(admin.worker().getMetrics()); - GsonBuilder gsonBuilder = new GsonBuilder(); - if (indent) { - gsonBuilder.setPrettyPrinting(); - } - System.out.println(gsonBuilder.create().toJson(new JsonParser().parse(json))); + printList(admin.worker().getMetrics()); } } @Parameters(commandDescription = "Get all workers belonging to this cluster") class GetCluster extends BaseCommand { - @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) - boolean indent = false; - @Override void runCmd() throws Exception { - List<WorkerInfo> workers = admin.worker().getCluster(); - GsonBuilder gsonBuilder = new GsonBuilder(); - if (indent) { - gsonBuilder.setPrettyPrinting(); - } - System.out.println(gsonBuilder.create().toJson(workers)); + printList(admin.worker().getCluster()); } } @Parameters(commandDescription = "Get the leader of the worker cluster") class GetClusterLeader extends BaseCommand { - @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) - boolean indent = false; - @Override void runCmd() throws Exception { - WorkerInfo leader = admin.worker().getClusterLeader(); - GsonBuilder gsonBuilder = new GsonBuilder(); - if (indent) { - gsonBuilder.setPrettyPrinting(); - } - System.out.println(gsonBuilder.create().toJson(leader)); + print(admin.worker().getClusterLeader()); } } @Parameters(commandDescription = "Get the assignments of the functions accross the worker cluster") class GetFunctionAssignments extends BaseCommand { - @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) - boolean indent = false; @Override void runCmd() throws Exception { - Map<String, Collection<String>> assignments = admin.worker().getAssignments(); - GsonBuilder gsonBuilder = new GsonBuilder(); - if (indent) { - gsonBuilder.setPrettyPrinting(); - } - System.out.println(gsonBuilder.create().toJson(assignments)); + print(admin.worker().getAssignments()); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java new file mode 100644 index 0000000..86079bb --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/WorkerFunctionInstanceStats.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.Data; +import org.apache.pulsar.common.policies.data.FunctionStats; + +@Data +public class WorkerFunctionInstanceStats { + /** fully qualified function instance name **/ + public String name; + public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics; +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index 56c945d..6351272 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -18,25 +18,25 @@ */ package org.apache.pulsar.functions.worker.rest.api; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.FunctionStats; +import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.MembershipManager; import org.apache.pulsar.functions.worker.Utils; import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.RestException; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -50,8 +50,6 @@ public class WorkerImpl { private final Supplier<WorkerService> workerServiceSupplier; - private final Gson gson = new GsonBuilder().setPrettyPrinting().create(); - public WorkerImpl(Supplier<WorkerService> workerServiceSupplier) { this.workerServiceSupplier = workerServiceSupplier; } @@ -76,36 +74,33 @@ public class WorkerImpl { return true; } - public Response getCluster() { + public List<WorkerInfo> getCluster() { if (!isWorkerServiceAvailable()) { - return getUnavailableResponse(); + throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while."); } List<WorkerInfo> workers = worker().getMembershipManager().getCurrentMembership(); - String jsonString = new Gson().toJson(workers); - return Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build(); + return workers; } - public Response getClusterLeader() { + public WorkerInfo getClusterLeader() { if (!isWorkerServiceAvailable()) { - return getUnavailableResponse(); + throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while."); } MembershipManager membershipManager = worker().getMembershipManager(); WorkerInfo leader = membershipManager.getLeader(); if (leader == null) { - return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData("Leader cannot be determined")).build(); + throw new RestException(Status.INTERNAL_SERVER_ERROR, "Leader cannot be determined"); } - String jsonString = new Gson().toJson(leader); - return Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build(); + return leader; } - public Response getAssignments() { + public Map<String, Collection<String>> getAssignments() { if (!isWorkerServiceAvailable()) { - return getUnavailableResponse(); + throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while."); } FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); @@ -114,14 +109,7 @@ public class WorkerImpl { for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) { ret.put(entry.getKey(), entry.getValue().keySet()); } - return Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(new Gson().toJson(ret)).build(); - } - - private Response getUnavailableResponse() { - return Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData( - "Function worker service is not done initializing. " + "Please try again in a little while.")) - .build(); + return ret; } public boolean isSuperUser(final String clientRole) { @@ -146,32 +134,25 @@ public class WorkerImpl { return worker().getMetricsGenerator().generate(); } - public Response getFunctionsMetrics(final String clientRole) { + public List<WorkerFunctionInstanceStats> getFunctionsMetrics(String clientRole) throws IOException { + if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); - return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData("client is not authorize to perform operation")).build(); + throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation"); } return getFunctionsMetrics(); } - @Data - public static class WorkerFunctionInstanceStats { - /** fully qualified function instance name **/ - public String name; - public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics; - } - - private Response getFunctionsMetrics() { + private List<WorkerFunctionInstanceStats> getFunctionsMetrics() throws IOException { if (!isWorkerServiceAvailable()) { - return getUnavailableResponse(); + throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while."); } WorkerService workerService = worker(); Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager() .getFunctionRuntimeInfos(); - JsonArray metricsMapList = new JsonArray(); + List<WorkerFunctionInstanceStats> metricsList = new ArrayList<>(functionRuntimes.size()); for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) { String fullyQualifiedInstanceName = entry.getKey(); @@ -184,10 +165,8 @@ public class WorkerImpl { workerFunctionInstanceStats.setName(fullyQualifiedInstanceName); workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics()); - metricsMapList.add(gson.toJsonTree(workerFunctionInstanceStats)); + metricsList.add(workerFunctionInstanceStats); } - String jsonResponse = gson.toJson(metricsMapList); - - return Response.status(Status.OK).entity(jsonResponse).build(); + return metricsList; } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java index 8cf0797..29d3ff0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java @@ -18,7 +18,15 @@ */ package org.apache.pulsar.functions.worker.rest.api.v2; -import java.util.function.Supplier; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.web.AuthenticationFilter; +import org.apache.pulsar.common.functions.WorkerInfo; +import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; @@ -28,18 +36,10 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - -import org.apache.pulsar.broker.web.AuthenticationFilter; -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.worker.WorkerService; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; @Slf4j @Path("/worker") @@ -76,33 +76,48 @@ public class WorkerApiV2Resource implements Supplier<WorkerService> { } @GET + @ApiOperation( + value = "Fetches information about the Pulsar cluster running Pulsar Functions", + response = WorkerInfo.class, + responseContainer = "List" + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 503, message = "Worker service is not running") + }) @Path("/cluster") - @ApiOperation(value = "Fetches information about the Pulsar cluster running Pulsar Functions") - @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"), - @ApiResponse(code = 503, message = "WorkerApiV2Resource service is not running") }) @Produces(MediaType.APPLICATION_JSON) - public Response getCluster() { + public List<WorkerInfo> getCluster() { return worker.getCluster(); } @GET + @ApiOperation( + value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions", + response = WorkerInfo.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 503, message = "Worker service is not running") + }) @Path("/cluster/leader") - @ApiOperation(value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions") - @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"), - @ApiResponse(code = 503, message = "WorkerApiV2Resource service is not running") }) @Produces(MediaType.APPLICATION_JSON) - public Response getClusterLeader() { + public WorkerInfo getClusterLeader() { return worker.getClusterLeader(); } @GET + @ApiOperation( + value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters", + response = Map.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 503, message = "Worker service is not running") + }) @Path("/assignments") - @ApiOperation(value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters", - response = Function.Assignment.class, - responseContainer = "Map") - @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"), - @ApiResponse(code = 503, message = "WorkerApiV2Resource service is not running") }) - public Response getAssignments() { + @Produces(MediaType.APPLICATION_JSON) + public Map<String, Collection<String>> getAssignments() { return worker.getAssignments(); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java index 022ebd3..bd404d6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java @@ -24,7 +24,7 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.web.AuthenticationFilter; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; @@ -36,9 +36,8 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import java.io.IOException; -import java.util.Collection; +import java.util.List; import java.util.function.Supplier; @Slf4j @@ -77,18 +76,33 @@ public class WorkerStatsApiV2Resource implements Supplier<WorkerService> { @GET @Path("/metrics") - @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") - @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) - public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception { + @ApiOperation( + value = "Gets the metrics for Monitoring", + notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", + response = org.apache.pulsar.common.stats.Metrics.class, + responseContainer = "List") + @ApiResponses(value = { + @ApiResponse(code = 401, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "Worker service is not running") + }) + @Produces(MediaType.APPLICATION_JSON) + public List<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception { return worker.getWorkerMetrics(clientAppId()); } @GET @Path("/functionsmetrics") - @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = WorkerImpl.WorkerFunctionInstanceStats.class, responseContainer = "List") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 503, message = "Worker service is not running") }) - public Response getFunctionsMetrics() throws IOException { + @ApiOperation( + value = "Get metrics for all functions owned by worker", + notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", + response = WorkerFunctionInstanceStats.class, + responseContainer = "List") + @ApiResponses(value = { + @ApiResponse(code = 401, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "Worker service is not running") + }) + @Produces(MediaType.APPLICATION_JSON) + public List<WorkerFunctionInstanceStats> getStats() throws IOException { return worker.getFunctionsMetrics(clientAppId()); } }