This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d804701   Add admin api to support stop function (#2415)
d804701 is described below

commit d804701966d719b132f0bd4a48496dc12aef459e
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Aug 20 23:20:40 2018 -0700

     Add admin api to support stop function (#2415)
    
    ### Motivation
    
    We have a usecase where we want to stop the function temporary to avoid 
draining on that function topics. So, adding an admin api/cli to stop function.
    
    ### Modifications
    
    - add REST/CLI api to stop function.
    
    ### Result
    
    we can stop and then restart function with admin/cli api.
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    | 25 ++++++++++++++++
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    | 17 ++++++++++-
 .../org/apache/pulsar/client/admin/Functions.java  | 34 +++++++++++++++++++++
 .../client/admin/internal/FunctionsImpl.java       | 21 +++++++++++++
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  | 29 ++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 31 ++++++++++++++++++-
 .../functions/worker/FunctionRuntimeManager.java   | 35 +++++++++++++---------
 .../functions/worker/rest/api/FunctionsImpl.java   | 28 +++++++++++++----
 .../worker/rest/api/v2/FunctionApiV2Resource.java  | 31 +++++++++++++++----
 9 files changed, 225 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index b8891e5..4384f50 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -293,6 +293,31 @@ public class FunctionsBase extends AdminResource 
implements Supplier<WorkerServi
     }
 
     @POST
+    @ApiOperation(value = "Stop function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, functionName, 
instanceId);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all function instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName) {
+        return functions.stopFunctionInstances(tenant, namespace, 
functionName);
+    }
+
+    @POST
     @ApiOperation(
             value = "Uploads Pulsar Function file data",
             hidden = true
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 1306f13..7302963 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -509,7 +509,7 @@ public class PulsarSinkE2ETest {
     }
     
     @Test(timeOut = 20000)
-    public void testFunctionRestartApi() throws Exception {
+    public void testFunctionStopAndRestartApi() throws Exception {
 
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
@@ -543,6 +543,21 @@ public class PulsarSinkE2ETest {
         SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
         assertEquals(subStats.consumers.size(), 1);
 
+        // it should stop consumer : so, check none of the consumer connected 
on subscription
+        admin.functions().stopFunction(tenant, namespacePortion, functionName);
+
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStat = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStat != null && subStat.consumers.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+        assertEquals(subStats.consumers.size(), 0);
+
         // it should restart consumer : so, check if consumer came up again 
after restarting function
         admin.functions().restartFunction(tenant, namespacePortion, 
functionName);
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 4525c51..7d16ff5 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -215,6 +215,40 @@ public interface Functions {
      */
     void restartFunction(String tenant, String namespace, String function) 
throws PulsarAdminException;
 
+    
+    /**
+     * Stop function instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @param instanceId
+     *            Function instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopFunction(String tenant, String namespace, String function, int 
instanceId) throws PulsarAdminException;
+    
+    /**
+     * Stop all function instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopFunction(String tenant, String namespace, String function) throws 
PulsarAdminException;
+    
     /**
      * Triggers the function by writing to the input topic.
      *
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 402b5d3..4552eba 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -237,6 +237,27 @@ public class FunctionsImpl extends BaseResource implements 
Functions {
     }
 
     @Override
+    public void stopFunction(String tenant, String namespace, String 
functionName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+                    .path("stop")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void stopFunction(String tenant, String namespace, String 
functionName) throws PulsarAdminException {
+        try {
+            
request(functions.path(tenant).path(namespace).path(functionName).path("stop"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public void uploadFunction(String sourceFile, String path) throws 
PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 11a3c7a..97e8633 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -32,6 +32,7 @@ import 
org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
 import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
+import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
 import org.apache.pulsar.admin.cli.CmdSinks.CreateSink;
 import org.apache.pulsar.admin.cli.CmdSources.CreateSource;
@@ -244,6 +245,34 @@ public class CmdFunctionsTest {
     }
 
     @Test
+    public void stopFunction() throws Exception {
+        String fnName = TEST_NAME + "-function";
+        String tenant = "sample";
+        String namespace = "ns1";
+        int instanceId = 0;
+        cmd.run(new String[] { "stop", "--tenant", tenant, "--namespace", 
namespace, "--name", fnName,
+                "--instance-id", Integer.toString(instanceId)});
+
+        StopFunction stop = cmd.getStopper();
+        assertEquals(fnName, stop.getFunctionName());
+
+        verify(functions, times(1)).stopFunction(tenant, namespace, fnName, 
instanceId);
+    }
+
+    @Test
+    public void stopFunctionInstances() throws Exception {
+        String fnName = TEST_NAME + "-function";
+        String tenant = "sample";
+        String namespace = "ns1";
+        cmd.run(new String[] { "stop", "--tenant", tenant, "--namespace", 
namespace, "--name", fnName });
+
+        StopFunction stop = cmd.getStopper();
+        assertEquals(fnName, stop.getFunctionName());
+
+        verify(functions, times(1)).stopFunction(tenant, namespace, fnName);
+    }
+    
+    @Test
     public void testCreateFunctionWithHttpUrl() throws Exception {
         String fnName = TEST_NAME + "-function";
         String inputTopicName = TEST_NAME + "-input-topic";
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 5f52313..a23bc7b 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -104,6 +104,7 @@ public class CmdFunctions extends CmdBase {
     private final GetFunction getter;
     private final GetFunctionStatus functionStatus;
     private final RestartFunction restart;
+    private final StopFunction stop;
     private final ListFunctions lister;
     private final StateGetter stateGetter;
     private final TriggerFunction triggerer;
@@ -853,7 +854,28 @@ public class CmdFunctions extends CmdBase {
             System.out.println("Restarted successfully");
         }
     }
-    
+
+    @Parameters(commandDescription = "Temporary stops function instance. (If 
worker restarts then it reassigns and starts functiona again")
+    class StopFunction extends FunctionCommand {
+        
+        @Parameter(names = "--instance-id", description = "The function 
instanceId (stop all instances if instance-id is not provided")
+        protected String instanceId;
+        
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.functions().stopFunction(tenant, namespace, 
functionName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.functions().stopFunction(tenant, namespace, 
functionName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+
     @Parameters(commandDescription = "Delete a Pulsar Function that's running 
on a Pulsar cluster")
     class DeleteFunction extends FunctionCommand {
         @Override
@@ -1066,12 +1088,14 @@ public class CmdFunctions extends CmdBase {
         downloader = new DownloadFunction();
         cluster = new GetCluster();
         restart = new RestartFunction();
+        stop = new StopFunction();
         jcommander.addCommand("localrun", getLocalRunner());
         jcommander.addCommand("create", getCreater());
         jcommander.addCommand("delete", getDeleter());
         jcommander.addCommand("update", getUpdater());
         jcommander.addCommand("get", getGetter());
         jcommander.addCommand("restart", getRestarter());
+        jcommander.addCommand("stop", getStopper());
         jcommander.addCommand("getstatus", getStatuser());
         jcommander.addCommand("list", getLister());
         jcommander.addCommand("querystate", getStateGetter());
@@ -1139,6 +1163,11 @@ public class CmdFunctions extends CmdBase {
         return restart;
     }
 
+    @VisibleForTesting
+    StopFunction getStopper() {
+        return stop;
+    }
+
     private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig 
functionConfig) {
         String[] args = fqfn.split("/");
         if (args.length != 3) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 121a454..1016171 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -90,7 +90,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     private MembershipManager membershipManager;
     private final ConnectorsManager connectorsManager;
-
+    
     public FunctionRuntimeManager(WorkerConfig workerConfig,
                                   PulsarClient pulsarClient,
                                   Namespace dlogNamespace,
@@ -326,7 +326,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         return functionStatus;
     }
 
-    public Response restartFunctionInstance(String tenant, String namespace, 
String functionName, int instanceId) throws Exception {
+    public Response stopFunctionInstance(String tenant, String namespace, 
String functionName, int instanceId,
+            boolean restart) throws Exception {
         Assignment assignment = this.findAssignment(tenant, namespace, 
functionName, instanceId);
         final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, 
namespace, functionName, instanceId);
         if (assignment == null) {
@@ -336,9 +337,9 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
         final String assignedWorkerId = assignment.getWorkerId();
         final String workerId = this.workerConfig.getWorkerId();
-        
+
         if (assignedWorkerId.equals(workerId)) {
-            
restartFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+            
stopFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), 
restart);
             return Response.status(Status.OK).build();
         } else {
             // query other worker
@@ -355,8 +356,10 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             }
 
             URI redirect = null;
-            final String redirectUrl = 
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart";,
-                    workerInfo.getWorkerHostname(), workerInfo.getPort(), 
tenant, namespace, functionName, instanceId);
+            String action = restart ? "restart" : "stop";
+            final String redirectUrl = 
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s";,
+                    workerInfo.getWorkerHostname(), workerInfo.getPort(), 
tenant, namespace, functionName, instanceId,
+                    action);
             try {
                 redirect = new URI(redirectUrl);
             } catch (URISyntaxException e) {
@@ -369,7 +372,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         }
     }
 
-    public Response restartFunctionInstances(String tenant, String namespace, 
String functionName) throws Exception {
+    public Response stopFunctionInstances(String tenant, String namespace, 
String functionName, boolean restart)
+            throws Exception {
         final String fullFunctionName = String.format("%s/%s/%s", tenant, 
namespace, functionName);
         Collection<Assignment> assignments = 
this.findFunctionAssignments(tenant, namespace, functionName);
 
@@ -382,7 +386,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             final String workerId = this.workerConfig.getWorkerId();
             String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
             if (assignedWorkerId.equals(workerId)) {
-                restartFunction(fullyQualifiedInstanceId);
+                stopFunction(fullyQualifiedInstanceId, restart);
             } else {
                 List<WorkerInfo> workerInfoList = 
this.membershipManager.getCurrentMembership();
                 WorkerInfo workerInfo = null;
@@ -398,10 +402,11 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                     continue;
                 }
                 Client client = ClientBuilder.newClient();
+                String action = restart ? "restart" : "stop";
                 // TODO: create and use pulsar-admin to support authorization 
and authentication and manage redirect
-                final String instanceRestartUrl = 
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart";,
+                final String instanceRestartUrl = 
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s";,
                         workerInfo.getWorkerHostname(), workerInfo.getPort(), 
tenant, namespace, functionName,
-                        assignment.getInstance().getInstanceId());
+                        assignment.getInstance().getInstanceId(), action);
                 
client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON)
                         .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
             }
@@ -409,15 +414,17 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         return Response.status(Status.OK).build();
     }
 
-    private void restartFunction(String fullyQualifiedInstanceId) throws 
Exception {
-        log.info("[{}] restarting..", fullyQualifiedInstanceId);
+    private void stopFunction(String fullyQualifiedInstanceId, boolean 
restart) throws Exception {
+        log.info("[{}] {}..", restart ? "restarting" : "stopping", 
fullyQualifiedInstanceId);
         FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
         if (functionRuntimeInfo != null) {
             this.functionActioner.stopFunction(functionRuntimeInfo);
             try {
-                this.functionActioner.startFunction(functionRuntimeInfo);
+                if(restart) {
+                    this.functionActioner.startFunction(functionRuntimeInfo);  
  
+                }
             } catch (Exception ex) {
-                log.info("{} Error starting function", 
fullyQualifiedInstanceId, ex);
+                log.info("{} Error re-starting function", 
fullyQualifiedInstanceId, ex);
                 functionRuntimeInfo.setStartupException(ex);
                 throw ex;
             }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index a9e03de..c2747f4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -79,7 +79,6 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.Builder;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
@@ -386,8 +385,18 @@ public class FunctionsImpl {
         return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
+    public Response stopFunctionInstance(final String tenant, final String 
namespace, final String functionName,
+            final String instanceId) {
+        return stopFunctionInstance(tenant, namespace, functionName, 
instanceId, false);
+    }
+
     public Response restartFunctionInstance(final String tenant, final String 
namespace, final String functionName,
             final String instanceId) {
+        return stopFunctionInstance(tenant, namespace, functionName, 
instanceId, true);
+    }
+
+    public Response stopFunctionInstance(final String tenant, final String 
namespace, final String functionName,
+            final String instanceId, boolean restart) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -411,8 +420,8 @@ public class FunctionsImpl {
 
         FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
         try {
-            return functionRuntimeManager.restartFunctionInstance(tenant, 
namespace, functionName,
-                    Integer.parseInt(instanceId));
+            return functionRuntimeManager.stopFunctionInstance(tenant, 
namespace, functionName,
+                    Integer.parseInt(instanceId), restart);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
@@ -421,7 +430,16 @@ public class FunctionsImpl {
         }
     }
 
+    public Response stopFunctionInstances(final String tenant, final String 
namespace, final String functionName) {
+        return stopFunctionInstances(tenant, namespace, functionName, false);
+    }
+
     public Response restartFunctionInstances(final String tenant, final String 
namespace, final String functionName) {
+        return stopFunctionInstances(tenant, namespace, functionName, true);
+    }
+
+    public Response stopFunctionInstances(final String tenant, final String 
namespace, final String functionName,
+            boolean restart) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -445,8 +463,8 @@ public class FunctionsImpl {
 
         FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
         try {
-            return functionRuntimeManager.restartFunctionInstances(tenant, 
namespace, functionName);
-        }catch (Exception e) {
+            return functionRuntimeManager.stopFunctionInstances(tenant, 
namespace, functionName, restart);
+        } catch (Exception e) {
             log.error("Failed to restart function: {}/{}/{}", tenant, 
namespace, functionName, e);
             return 
Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.getMessage()).build();
         }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 3581453..ddea22a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import java.io.IOException;
 import java.io.InputStream;
@@ -41,9 +40,6 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.functions.worker.WorkerInfo;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-import org.glassfish.jersey.media.multipart.FormDataParam;
 
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -189,7 +185,32 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
             final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName) {
         return functions.restartFunctionInstances(tenant, namespace, 
functionName);
     }
-    
+
+    @POST
+    @ApiOperation(value = "Stop function instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, functionName, 
instanceId);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all function instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{functionName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopFunction(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("functionName") String functionName) {
+        return functions.stopFunctionInstances(tenant, namespace, 
functionName);
+    }
+
     @POST
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)

Reply via email to