This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f44d367 Added cli commands to get function cluster related information (#2426) f44d367 is described below commit f44d367f1bd0de30b4702c174c4697d591f072fc Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Aug 24 23:28:34 2018 -0700 Added cli commands to get function cluster related information (#2426) * Added command line to get cluster/cluster leader/function assignment information. Also refactored such kind of meta requests to a seperate endpoint * Removed left-over debug statements * Added back /functionsmetrics * Added /functionsmetrics back to the broker worker * Removed leftover references of getcluster * Fix integration tests * Fixed instantiation of service in worker only mode * Removed log statement * Seperated stats calls to a seperate endpoint to mimic broker --- .../pulsar/broker/admin/impl/FunctionsBase.java | 44 ----- .../org/apache/pulsar/broker/admin/v2/Worker.java | 94 ++++++++++ .../apache/pulsar/broker/admin/v2/WorkerStats.java | 52 +++--- .../apache/pulsar/io/PulsarFunctionAdminTest.java | 8 - .../org/apache/pulsar/client/admin/Functions.java | 7 - .../apache/pulsar/client/admin/PulsarAdmin.java | 10 +- .../client/admin/{WorkerStats.java => Worker.java} | 26 ++- .../{WorkerStatsImpl.java => WorkerImpl.java} | 54 +++++- ...tionWorkerStats.java => CmdFunctionWorker.java} | 71 +++++++- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 13 -- .../apache/pulsar/admin/cli/PulsarAdminTool.java | 2 +- .../functions/worker/rest/FunctionApiResource.java | 1 - .../pulsar/functions/worker/rest/Resources.java | 5 +- .../pulsar/functions/worker/rest/WorkerServer.java | 4 + .../functions/worker/rest/api/FunctionsImpl.java | 134 +------------- .../functions/worker/rest/api/WorkerImpl.java | 199 +++++++++++++++++++++ .../worker/rest/api/v2/FunctionApiV2Resource.java | 25 --- .../worker/rest/api/v2/WorkerApiV2Resource.java | 108 +++++++++++ ...kerStats.java => WorkerStatsApiV2Resource.java} | 75 +++++--- ...est.java => WorkerApiV2ResourceConfigTest.java} | 2 +- .../integration/containers/WorkerContainer.java | 2 +- 21 files changed, 640 insertions(+), 296 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 315027f..def5452 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -198,50 +198,6 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi } - @GET - @ApiOperation( - value = "Fetches information about the Pulsar cluster running Pulsar Functions", - response = WorkerInfo.class, - responseContainer = "List" - ) - @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") - - }) - @Path("/cluster") - @Produces(MediaType.APPLICATION_JSON) - public List<WorkerInfo> getCluster() { - return functions.getCluster(); - } - - @GET - @ApiOperation( - value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions", - response = WorkerInfo.class - ) - @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") - - }) - @Path("/cluster/leader") - public WorkerInfo getClusterLeader() { - return functions.getClusterLeader(); - } - - @GET - @ApiOperation( - value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters", - response = Assignment.class, - responseContainer = "Map" - ) - @ApiResponses(value = { - @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") - }) - @Path("/assignments") - public Response getAssignments() { - return functions.getAssignments(); - } - @POST @ApiOperation( value = "Triggers a Pulsar Function with a user-specified value or file data", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java new file mode 100644 index 0000000..46427cb --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v2; + +import java.util.function.Supplier; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.worker.WorkerService; + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; + +@Slf4j +@Path("/worker") +public class Worker extends AdminResource implements Supplier<WorkerService> { + + private final WorkerImpl worker; + + public Worker() { + this.worker = new WorkerImpl(this); + } + + @Override + public WorkerService get() { + return pulsar().getWorkerService(); + } + + @GET + @ApiOperation( + value = "Fetches information about the Pulsar cluster running Pulsar Functions" + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + + }) + @Path("/cluster") + @Produces(MediaType.APPLICATION_JSON) + public Response getCluster() { + return worker.getCluster(); + } + + @GET + @ApiOperation( + value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + + }) + @Path("/cluster/leader") + @Produces(MediaType.APPLICATION_JSON) + public Response getClusterLeader() { + return worker.getClusterLeader(); + } + + @GET + @ApiOperation( + value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters", + response = Function.Assignment.class, + responseContainer = "Map" + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/assignments") + public Response getAssignments() { + return worker.getAssignments(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java index 5a0e4b7..1a9c7ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java @@ -18,39 +18,51 @@ */ package org.apache.pulsar.broker.admin.v2; -import java.io.IOException; -import java.util.Collection; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.core.Response; - -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; -import org.apache.pulsar.functions.worker.rest.FunctionApiResource; - import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.Collection; +import java.util.function.Supplier; @Slf4j @Path("/worker-stats") -public class WorkerStats extends FunctionApiResource { +public class WorkerStats extends AdminResource implements Supplier<WorkerService> { - @GET - @Path("/functions") - @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 503, message = "Worker service is not running") }) - public Response getStats() throws IOException { - return functions.getFunctionsMetrcis(clientAppId()); + private final WorkerImpl worker; + + public WorkerStats() { + this.worker = new WorkerImpl(this); + } + + @Override + public WorkerService get() { + return pulsar().getWorkerService(); } - + @GET @Path("/metrics") @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception { - return functions.getWorkerMetrcis(clientAppId()); + return worker.getWorkerMetrcis(clientAppId()); + } + + @GET + @Path("/functionsmetrics") + @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "Worker service is not running") }) + public Response getStats() throws IOException { + return worker.getFunctionsMetrics(clientAppId()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index 427976f..a758c87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -209,12 +209,4 @@ public class PulsarFunctionAdminTest { return new WorkerService(workerConfig); } - - @Test - public void testGetWokersApi() throws Exception { - List<WorkerInfo> workers = admin.functions().getCluster(); - Assert.assertEquals(workers.size(), 1); - Assert.assertEquals(workers.get(0).getPort(), workerServicePort); - } - } \ No newline at end of file diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index 2fc1796..96e9cbc 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -320,11 +320,4 @@ public interface Functions { * */ Set<String> getSinks() throws PulsarAdminException; - - /** - * Get list of workers present under a cluster - * @return - * @throws PulsarAdminException - */ - List<WorkerInfo> getCluster() throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java index 7a41fe9..a1502b0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java @@ -43,7 +43,7 @@ import org.apache.pulsar.client.admin.internal.NamespacesImpl; import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl; import org.apache.pulsar.client.admin.internal.SchemasImpl; import org.apache.pulsar.client.admin.internal.TopicsImpl; -import org.apache.pulsar.client.admin.internal.WorkerStatsImpl; +import org.apache.pulsar.client.admin.internal.WorkerImpl; import org.apache.pulsar.client.admin.internal.TenantsImpl; import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl; @@ -85,7 +85,7 @@ public class PulsarAdmin implements Closeable { private final String serviceUrl; private final Lookup lookups; private final Functions functions; - private final WorkerStats workerStats; + private final Worker worker; private final Schemas schemas; protected final WebTarget root; protected final Authentication auth; @@ -189,7 +189,7 @@ public class PulsarAdmin implements Closeable { this.resourceQuotas = new ResourceQuotasImpl(root, auth); this.lookups = new LookupImpl(root, auth, useTls); this.functions = new FunctionsImpl(root, auth); - this.workerStats = new WorkerStatsImpl(root, auth); + this.worker = new WorkerImpl(root, auth); this.schemas = new SchemasImpl(root, auth); this.bookies = new BookiesImpl(root, auth); } @@ -361,8 +361,8 @@ public class PulsarAdmin implements Closeable { * * @return the Worker stats */ - public WorkerStats workerStats() { - return workerStats; + public Worker worker() { + return worker; } /** diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java similarity index 66% rename from pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java rename to pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java index 4fc242f..709b713 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java @@ -19,13 +19,16 @@ package org.apache.pulsar.client.admin; import java.util.Collection; +import java.util.List; +import java.util.Map; import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import org.apache.pulsar.functions.worker.WorkerInfo; /** * Admin interface for worker stats management. */ -public interface WorkerStats { +public interface Worker { /** @@ -41,4 +44,25 @@ public interface WorkerStats { * @throws PulsarAdminException */ Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException; + + /** + * Get List of all workers belonging to this cluster + * @return + * @throws PulsarAdminException + */ + List<WorkerInfo> getCluster() throws PulsarAdminException; + + /** + * Get the worker who is the leader of the cluster + * @return + * @throws PulsarAdminException + */ + WorkerInfo getClusterLeader() throws PulsarAdminException; + + /** + * Get the function assignment among the cluster + * @return + * @throws PulsarAdminException + */ + Map<String, Collection<String>> getAssignments() throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java similarity index 56% rename from pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java rename to pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java index f492d31..4ca46cc 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java @@ -20,35 +20,42 @@ package org.apache.pulsar.client.admin.internal; import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson; +import java.lang.reflect.Type; import java.util.Collection; import java.util.List; +import java.util.Map; import javax.ws.rs.ClientErrorException; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.Response; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.WorkerStats; +import org.apache.pulsar.client.admin.Worker; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.worker.WorkerInfo; @Slf4j -public class WorkerStatsImpl extends BaseResource implements WorkerStats { +public class WorkerImpl extends BaseResource implements Worker { private final WebTarget workerStats; + private final WebTarget worker; - public WorkerStatsImpl(WebTarget web, Authentication auth) { + public WorkerImpl(WebTarget web, Authentication auth) { super(auth); - this.workerStats = web.path("/admin/worker-stats"); + this.worker = web.path("/admin/v2/worker"); + this.workerStats = web.path("/admin/v2/worker-stats"); } @Override public Metrics getFunctionsStats() throws PulsarAdminException { try { - Response response = request(workerStats.path("functions")).get(); + Response response = request(workerStats.path("functionsmetrics")).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { throw new ClientErrorException(response); } @@ -71,4 +78,41 @@ public class WorkerStatsImpl extends BaseResource implements WorkerStats { throw getApiException(e); } } + + @Override + public List<WorkerInfo> getCluster() throws PulsarAdminException { + try { + return request(worker.path("cluster")) + .get(new GenericType<List<WorkerInfo>>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public WorkerInfo getClusterLeader() throws PulsarAdminException { + try { + return request(worker.path("cluster").path("leader")) + .get(new GenericType<WorkerInfo>(){}); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public Map<String, Collection<String>> getAssignments() throws PulsarAdminException { + try { + Response response = request(worker.path("assignments")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); + Type type = new TypeToken<Map<String, Collection<String>>>(){}.getType(); + Map<String, Collection<String>> assignments = new Gson().fromJson(jsonResponse, type); + return assignments; + } catch (Exception e) { + throw getApiException(e); + } + } } \ No newline at end of file diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java similarity index 51% rename from pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java rename to pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java index 6bd22ae..9122375 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java @@ -30,10 +30,15 @@ import com.google.gson.JsonParser; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.worker.WorkerInfo; + +import java.util.Collection; +import java.util.List; +import java.util.Map; @Slf4j @Parameters(commandDescription = "Operations to collect function-worker statistics") -public class CmdFunctionWorkerStats extends CmdBase { +public class CmdFunctionWorker extends CmdBase { /** * Base command @@ -60,7 +65,7 @@ public class CmdFunctionWorkerStats extends CmdBase { @Override void runCmd() throws Exception { - String json = Utils.printJson(admin.workerStats().getFunctionsStats()); + String json = Utils.printJson(admin.worker().getFunctionsStats()); GsonBuilder gsonBuilder = new GsonBuilder(); if (indent) { gsonBuilder.setPrettyPrinting(); @@ -77,7 +82,7 @@ public class CmdFunctionWorkerStats extends CmdBase { @Override void runCmd() throws Exception { - String json = new Gson().toJson(admin.workerStats().getMetrics()); + String json = new Gson().toJson(admin.worker().getMetrics()); GsonBuilder gsonBuilder = new GsonBuilder(); if (indent) { gsonBuilder.setPrettyPrinting(); @@ -86,10 +91,64 @@ public class CmdFunctionWorkerStats extends CmdBase { } } - public CmdFunctionWorkerStats(PulsarAdmin admin) throws PulsarClientException { - super("functions", admin); - jcommander.addCommand("functions", new FunctionsStats()); + @Parameters(commandDescription = "Get all workers belonging to this cluster") + class GetCluster extends BaseCommand { + + @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) + boolean indent = false; + + @Override + void runCmd() throws Exception { + List<WorkerInfo> workers = admin.worker().getCluster(); + GsonBuilder gsonBuilder = new GsonBuilder(); + if (indent) { + gsonBuilder.setPrettyPrinting(); + } + System.out.println(gsonBuilder.create().toJson(workers)); + } + } + + @Parameters(commandDescription = "Get the leader of the worker cluster") + class GetClusterLeader extends BaseCommand { + + @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) + boolean indent = false; + + @Override + void runCmd() throws Exception { + WorkerInfo leader = admin.worker().getClusterLeader(); + GsonBuilder gsonBuilder = new GsonBuilder(); + if (indent) { + gsonBuilder.setPrettyPrinting(); + } + System.out.println(gsonBuilder.create().toJson(leader)); + } + } + + @Parameters(commandDescription = "Get the assignments of the functions accross the worker cluster") + class GetFunctionAssignments extends BaseCommand { + + @Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false) + boolean indent = false; + + @Override + void runCmd() throws Exception { + Map<String, Collection<String>> assignments = admin.worker().getAssignments(); + GsonBuilder gsonBuilder = new GsonBuilder(); + if (indent) { + gsonBuilder.setPrettyPrinting(); + } + System.out.println(gsonBuilder.create().toJson(assignments)); + } + } + + public CmdFunctionWorker(PulsarAdmin admin) throws PulsarClientException { + super("functions-worker", admin); + jcommander.addCommand("function-stats", new FunctionsStats()); jcommander.addCommand("monitoring-metrics", new CmdMonitoringMetrics()); + jcommander.addCommand("get-cluster", new GetCluster()); + jcommander.addCommand("get-cluster-leader", new GetClusterLeader()); + jcommander.addCommand("get-function-assignments", new GetFunctionAssignments()); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index f31fd82..88412a6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -112,7 +112,6 @@ public class CmdFunctions extends CmdBase { private final TriggerFunction triggerer; private final UploadFunction uploader; private final DownloadFunction downloader; - private final GetCluster cluster; /** * Base command @@ -1107,16 +1106,6 @@ public class CmdFunctions extends CmdBase { } } - @Parameters(commandDescription = "Get list of workers registered in cluster") - class GetCluster extends BaseCommand { - @Override - void runCmd() throws Exception { - String json = (new Gson()).toJson(admin.functions().getCluster()); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - System.out.println(gson.toJson(new JsonParser().parse(json))); - } - } - public CmdFunctions(PulsarAdmin admin) throws PulsarClientException { super("functions", admin); localRunner = new LocalRunner(); @@ -1130,7 +1119,6 @@ public class CmdFunctions extends CmdBase { triggerer = new TriggerFunction(); uploader = new UploadFunction(); downloader = new DownloadFunction(); - cluster = new GetCluster(); restart = new RestartFunction(); stop = new StopFunction(); jcommander.addCommand("localrun", getLocalRunner()); @@ -1146,7 +1134,6 @@ public class CmdFunctions extends CmdBase { jcommander.addCommand("trigger", getTriggerer()); jcommander.addCommand("upload", getUploader()); jcommander.addCommand("download", getDownloader()); - jcommander.addCommand("cluster", cluster); } @VisibleForTesting diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java index f30b887..b82172b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java @@ -99,7 +99,7 @@ public class PulsarAdminTool { commandMap.put("resource-quotas", CmdResourceQuotas.class); commandMap.put("functions", CmdFunctions.class); - commandMap.put("functions-worker-stats", CmdFunctionWorkerStats.class); + commandMap.put("functions-worker", CmdFunctionWorker.class); commandMap.put("source", CmdSources.class); commandMap.put("sink", CmdSinks.class); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java index be97986..2456a0d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.functions.worker.rest; -import java.util.Optional; import java.util.function.Supplier; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java index 8b73c13..26c7127 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java @@ -20,12 +20,11 @@ package org.apache.pulsar.functions.worker.rest; import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource; import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource; -import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStats; +import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource; import org.glassfish.jersey.media.multipart.MultiPartFeature; import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Set; public final class Resources { @@ -37,7 +36,7 @@ public final class Resources { return new HashSet<>( Arrays.asList( FunctionApiV2Resource.class, - WorkerStats.class, + WorkerApiV2Resource.class, MultiPartFeature.class )); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index 1c26c7d..a091472 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -31,6 +31,8 @@ import java.util.concurrent.Executors; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource; +import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -141,6 +143,8 @@ public class WorkerServer { new ServletContextHandler(ServletContextHandler.NO_SESSIONS); contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, workerService); + contextHandler.setAttribute(WorkerApiV2Resource.ATTRIBUTE_WORKER_SERVICE, workerService); + contextHandler.setAttribute(WorkerStatsApiV2Resource.ATTRIBUTE_WORKERSTATS_SERVICE, workerService); contextHandler.setContextPath(contextPath); final ServletHolder apiServlet = diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index d5f3f57..bbb4f8f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -27,11 +27,7 @@ import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoade import com.google.gson.Gson; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.*; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; @@ -39,10 +35,8 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.Base64; import java.util.Collection; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -71,7 +65,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.Codec; -import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; @@ -79,17 +72,10 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics; -import org.apache.pulsar.functions.runtime.Runtime; -import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; -import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; -import org.apache.pulsar.functions.worker.MembershipManager; import org.apache.pulsar.functions.worker.Utils; -import org.apache.pulsar.functions.worker.WorkerInfo; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.io.core.Sink; @@ -583,48 +569,6 @@ public class FunctionsImpl { return this.worker().getConnectorsManager().getConnectors(); } - public List<WorkerInfo> getCluster() { - if (!isWorkerServiceAvailable()) { - throw new WebApplicationException( - Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData("Function worker service is not avaialable")).build()); - } - return worker().getMembershipManager().getCurrentMembership(); - } - - public WorkerInfo getClusterLeader() { - if (!isWorkerServiceAvailable()) { - throw new WebApplicationException( - Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData("Function worker service is not avaialable")).build()); - } - - MembershipManager membershipManager = worker().getMembershipManager(); - WorkerInfo leader = membershipManager.getLeader(); - - if (leader == null) { - throw new WebApplicationException(Response.status(Status.INTERNAL_SERVER_ERROR) - .type(MediaType.APPLICATION_JSON).entity(new ErrorData("Leader cannot be determined")).build()); - } - - return leader; - } - - public Response getAssignments() { - - if (!isWorkerServiceAvailable()) { - return getUnavailableResponse(); - } - - FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); - Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments(); - Map<String, Collection<String>> ret = new HashMap<>(); - for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) { - ret.put(entry.getKey(), entry.getValue().keySet()); - } - return Response.status(Status.OK).entity(new Gson().toJson(ret)).build(); - } - public Response triggerFunction(final String tenant, final String namespace, final String functionName, final String input, final InputStream uploadedInputStream, final String topic) { @@ -1069,80 +1013,4 @@ public class FunctionsImpl { return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole); } - public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis(String clientRole) throws IOException { - if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { - log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); - throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(clientRole + " is not authorize to get metrics")).build()); - } - return getWorkerMetrcis(); - } - - private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() { - if (!isWorkerServiceAvailable()) { - throw new WebApplicationException( - Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData("Function worker service is not avaialable")).build()); - } - return worker().getMetricsGenerator().generate(); - } - - public Response getFunctionsMetrcis(String clientRole) throws IOException { - if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { - log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); - return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData("client is not authorize to perform operation")).build(); - } - return getFunctionsMetrcis(); - } - - private Response getFunctionsMetrcis() throws IOException { - if (!isWorkerServiceAvailable()) { - return getUnavailableResponse(); - } - - WorkerService workerService = worker(); - Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager() - .getFunctionRuntimeInfos(); - - Metrics.Builder metricsBuilder = Metrics.newBuilder(); - for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) { - String fullyQualifiedInstanceName = entry.getKey(); - FunctionRuntimeInfo functionRuntimeInfo = entry.getValue(); - RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); - - if (functionRuntimeSpawner != null) { - Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); - if (functionRuntime != null) { - try { - InstanceCommunication.MetricsData metricsData = workerService.getWorkerConfig() - .getMetricsSamplingPeriodSec() > 0 ? functionRuntime.getMetrics().get() - : functionRuntime.getAndResetMetrics().get(); - - String tenant = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .getFunctionDetails().getTenant(); - String namespace = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .getFunctionDetails().getNamespace(); - String name = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .getFunctionDetails().getName(); - int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); - String qualifiedFunctionName = String.format("%s/%s/%s", tenant, namespace, name); - - InstanceMetrics.Builder instanceBuilder = InstanceMetrics.newBuilder(); - instanceBuilder.setName(qualifiedFunctionName); - instanceBuilder.setInstanceId(instanceId); - if (metricsData != null) { - instanceBuilder.setMetricsData(metricsData); - } - metricsBuilder.addMetrics(instanceBuilder.build()); - } catch (InterruptedException | ExecutionException e) { - log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e); - } - } - } - } - String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder); - return Response.status(Status.OK).entity(jsonResponse).build(); - } - } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java new file mode 100644 index 0000000..87e9bc2 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api; + +import com.google.gson.Gson; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics; +import org.apache.pulsar.functions.runtime.Runtime; +import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.apache.pulsar.functions.worker.*; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import java.io.*; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static com.google.common.base.Preconditions.checkNotNull; + +@Slf4j +public class WorkerImpl { + + private final Supplier<WorkerService> workerServiceSupplier; + + public WorkerImpl(Supplier<WorkerService> workerServiceSupplier) { + this.workerServiceSupplier = workerServiceSupplier; + } + + private WorkerService worker() { + try { + return checkNotNull(workerServiceSupplier.get()); + } catch (Throwable t) { + log.info("Failed to get worker service", t); + throw t; + } + } + + private boolean isWorkerServiceAvailable() { + WorkerService workerService = workerServiceSupplier.get(); + if (workerService == null) { + return false; + } + if (!workerService.isInitialized()) { + return false; + } + return true; + } + + public Response getCluster() { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + List<WorkerInfo> workers = worker().getMembershipManager().getCurrentMembership(); + String jsonString = new Gson().toJson(workers); + return Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build(); + } + + public Response getClusterLeader() { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + MembershipManager membershipManager = worker().getMembershipManager(); + WorkerInfo leader = membershipManager.getLeader(); + + if (leader == null) { + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Leader cannot be determined")).build(); + } + + String jsonString = new Gson().toJson(leader); + return Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build(); + } + + public Response getAssignments() { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); + Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments(); + Map<String, Collection<String>> ret = new HashMap<>(); + for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) { + ret.put(entry.getKey(), entry.getValue().keySet()); + } + return Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(new Gson().toJson(ret)).build(); + } + + private Response getUnavailableResponse() { + return Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData( + "Function worker service is not done initializing. " + "Please try again in a little while.")) + .build(); + } + + public boolean isSuperUser(String clientRole) { + return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole); + } + + public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis(String clientRole) throws IOException { + if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { + log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); + throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(clientRole + " is not authorize to get metrics")).build()); + } + return getWorkerMetrcis(); + } + + private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() { + if (!isWorkerServiceAvailable()) { + throw new WebApplicationException( + Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Function worker service is not avaialable")).build()); + } + return worker().getMetricsGenerator().generate(); + } + + public Response getFunctionsMetrics(String clientRole) throws IOException { + if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { + log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); + return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("client is not authorize to perform operation")).build(); + } + return getFunctionsMetrics(); + } + + private Response getFunctionsMetrics() throws IOException { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + WorkerService workerService = worker(); + Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager() + .getFunctionRuntimeInfos(); + + Metrics.Builder metricsBuilder = Metrics.newBuilder(); + for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) { + String fullyQualifiedInstanceName = entry.getKey(); + FunctionRuntimeInfo functionRuntimeInfo = entry.getValue(); + RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); + + if (functionRuntimeSpawner != null) { + Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); + if (functionRuntime != null) { + try { + InstanceCommunication.MetricsData metricsData = workerService.getWorkerConfig() + .getMetricsSamplingPeriodSec() > 0 ? functionRuntime.getMetrics().get() + : functionRuntime.getAndResetMetrics().get(); + + String tenant = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() + .getFunctionDetails().getTenant(); + String namespace = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() + .getFunctionDetails().getNamespace(); + String name = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() + .getFunctionDetails().getName(); + int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); + String qualifiedFunctionName = String.format("%s/%s/%s", tenant, namespace, name); + + InstanceMetrics.Builder instanceBuilder = InstanceMetrics.newBuilder(); + instanceBuilder.setName(qualifiedFunctionName); + instanceBuilder.setInstanceId(instanceId); + if (metricsData != null) { + instanceBuilder.setMetricsData(metricsData); + } + metricsBuilder.addMetrics(instanceBuilder.build()); + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e); + } + } + } + } + String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder); + return Response.status(Status.OK).entity(jsonResponse).build(); + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index e13f69c..95fe687 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -35,17 +35,13 @@ import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.pulsar.functions.worker.WorkerInfo; - import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import lombok.extern.slf4j.Slf4j; @Slf4j @Path("/functions") @@ -129,27 +125,6 @@ public class FunctionApiV2Resource extends FunctionApiResource { } - @GET - @Path("/cluster") - @Produces(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Fetches information about the Pulsar cluster running Pulsar Functions") - public List<WorkerInfo> getCluster() { - return functions.getCluster(); - } - - @GET - @Path("/cluster/leader") - @Produces(MediaType.APPLICATION_JSON) - public WorkerInfo getClusterLeader() { - return functions.getClusterLeader(); - } - - @GET - @Path("/assignments") - public Response getAssignments() { - return functions.getAssignments(); - } - @POST @Path("/{tenant}/{namespace}/{functionName}/trigger") @Consumes(MediaType.MULTIPART_FORM_DATA) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java new file mode 100644 index 0000000..8cf0797 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api.v2; + +import java.util.function.Supplier; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.pulsar.broker.web.AuthenticationFilter; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.worker.WorkerService; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; + +@Slf4j +@Path("/worker") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Api(value = "/worker", description = "Workers admin api", tags = "workers") +public class WorkerApiV2Resource implements Supplier<WorkerService> { + + public static final String ATTRIBUTE_WORKER_SERVICE = "worker"; + + protected final WorkerImpl worker; + private WorkerService workerService; + @Context + protected ServletContext servletContext; + @Context + protected HttpServletRequest httpRequest; + + public WorkerApiV2Resource() { + this.worker = new WorkerImpl(this); + } + + @Override + public synchronized WorkerService get() { + if (this.workerService == null) { + this.workerService = (WorkerService) servletContext.getAttribute(ATTRIBUTE_WORKER_SERVICE); + } + return this.workerService; + } + + public String clientAppId() { + return httpRequest != null + ? (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName) + : null; + } + + @GET + @Path("/cluster") + @ApiOperation(value = "Fetches information about the Pulsar cluster running Pulsar Functions") + @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "WorkerApiV2Resource service is not running") }) + @Produces(MediaType.APPLICATION_JSON) + public Response getCluster() { + return worker.getCluster(); + } + + @GET + @Path("/cluster/leader") + @ApiOperation(value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions") + @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "WorkerApiV2Resource service is not running") }) + @Produces(MediaType.APPLICATION_JSON) + public Response getClusterLeader() { + return worker.getClusterLeader(); + } + + @GET + @Path("/assignments") + @ApiOperation(value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters", + response = Function.Assignment.class, + responseContainer = "Map") + @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "WorkerApiV2Resource service is not running") }) + public Response getAssignments() { + return worker.getAssignments(); + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java similarity index 53% rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java index 146bb21..c802da7 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java @@ -18,46 +18,77 @@ */ package org.apache.pulsar.functions.worker.rest.api.v2; -import java.io.IOException; -import java.util.Collection; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.web.AuthenticationFilter; +import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; +import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.api.WorkerImpl; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - -import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; -import org.apache.pulsar.functions.worker.rest.FunctionApiResource; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.util.Collection; +import java.util.function.Supplier; @Slf4j @Path("/worker-stats") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -@Api(value = "/worker-stats", description = "Workers admin api", tags = "workers") -public class WorkerStats extends FunctionApiResource { +@Api(value = "/worker-stats", description = "Workers stats api", tags = "workers-stats") +public class WorkerStatsApiV2Resource implements Supplier<WorkerService> { - @GET - @Path("/functions") - @ApiOperation(value = "Get stats for all functions owned by worker", notes = "Request should be executed by Monitoring agent on each worker to fetch the function-metrics", response = Metrics.class) - @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission"), - @ApiResponse(code = 503, message = "Worker service is not running") }) - public Response getStats() throws IOException { - return functions.getFunctionsMetrcis(clientAppId()); + public static final String ATTRIBUTE_WORKERSTATS_SERVICE = "worker-stats"; + + protected final WorkerImpl worker; + private WorkerService workerService; + @Context + protected ServletContext servletContext; + @Context + protected HttpServletRequest httpRequest; + + public WorkerStatsApiV2Resource() { + this.worker = new WorkerImpl(this); + } + + @Override + public synchronized WorkerService get() { + if (this.workerService == null) { + this.workerService = (WorkerService) servletContext.getAttribute(ATTRIBUTE_WORKERSTATS_SERVICE); + } + return this.workerService; } - + + public String clientAppId() { + return httpRequest != null + ? (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName) + : null; + } + @GET @Path("/metrics") @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception { - return functions.getWorkerMetrcis(clientAppId()); + return worker.getWorkerMetrcis(clientAppId()); + } + + @GET + @Path("/functionsmetrics") + @ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "Worker service is not running") }) + public Response getFunctionsMetrics() throws IOException { + return worker.getFunctionsMetrics(clientAppId()); } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java similarity index 98% rename from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java rename to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java index b830034..4c6d249 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java @@ -26,7 +26,7 @@ import org.testng.annotations.Test; /** * Unit test of {@link WorkerConfig}. */ -public class WorkerConfigTest { +public class WorkerApiV2ResourceConfigTest { private static final String TEST_NAME = "test-worker-config"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java index dcc0999..947acb7 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java @@ -33,6 +33,6 @@ public class WorkerContainer extends PulsarContainer<WorkerContainer> { "bin/run-functions-worker.sh", -1, BROKER_HTTP_PORT, - "/admin/v2/functions/cluster"); + "/admin/v2/worker/cluster"); } }