This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ee81c7e7ad6a7b4f466ba4abf478545e6d3bccf2
Author: Sanjeev Kulkarni <sanj...@streaml.io>
AuthorDate: Thu Oct 4 16:56:26 2018 -0700

    First cut of adding new endpoints for source/sink
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |   4 +-
 .../apache/pulsar/broker/admin/impl/SinkBase.java  | 258 ++++++++++++++++++++
 .../pulsar/broker/admin/impl/SourceBase.java       | 261 +++++++++++++++++++++
 .../org/apache/pulsar/broker/admin/v1/Sink.java    |  29 +++
 .../org/apache/pulsar/broker/admin/v1/Source.java  |  29 +++
 .../org/apache/pulsar/broker/admin/v2/Sink.java    |  34 +++
 .../org/apache/pulsar/broker/admin/v2/Source.java  |  34 +++
 .../runtime/KubernetesRuntimeFactory.java          |   3 +-
 .../src/main/resources/java_instance_log4j2.yml    |   4 +-
 .../pulsar/functions/worker/rest/Resources.java    |   4 +
 .../functions/worker/rest/api/FunctionsImpl.java   |  64 +++--
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |   4 +-
 .../worker/rest/api/v2/SinkApiV2Resource.java      | 180 ++++++++++++++
 .../worker/rest/api/v2/SourceApiV2Resource.java    | 180 ++++++++++++++
 14 files changed, 1060 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index cff3c34..d20bc62 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -84,7 +84,7 @@ public class FunctionsBase extends AdminResource implements 
Supplier<WorkerServi
                                      final @FormDataParam("functionConfig") 
String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, 
clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, null, 
null, clientAppId());
     }
 
     @PUT
@@ -106,7 +106,7 @@ public class FunctionsBase extends AdminResource implements 
Supplier<WorkerServi
                                    final @FormDataParam("functionConfig") 
String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, 
clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, null, 
null, clientAppId());
 
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
new file mode 100644
index 0000000..43ca8bf
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class SinkBase extends AdminResource implements Supplier<WorkerService> 
{
+
+    private final FunctionsImpl functions;
+
+    public SinkBase() {
+        this.functions = new FunctionsImpl(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
+    }
+
+    @POST
+    @ApiOperation(value = "Creates a new Pulsar Sink in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function 
already exists, etc.)"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully 
created")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerSink(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String 
namespace,
+                                 final @PathParam("sinkName") String sinkName,
+                                 final @FormDataParam("data") InputStream 
uploadedInputStream,
+                                 final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String 
functionPkgUrl,
+                                 final @FormDataParam("sinkConfig") String 
sinkConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, sinkName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, null, sinkConfigJson, 
clientAppId());
+    }
+
+    @PUT
+    @ApiOperation(value = "Updates a Pulsar Sink currently running in cluster 
mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function 
doesn't exist, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully 
updated")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateSink(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("sinkName") String sinkName,
+                               final @FormDataParam("data") InputStream 
uploadedInputStream,
+                               final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
+                               final @FormDataParam("url") String 
functionPkgUrl,
+                               final @FormDataParam("sinkConfig") String 
sinkConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, sinkName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, null, sinkConfigJson, 
clientAppId());
+
+    }
+
+
+    @DELETE
+    @ApiOperation(value = "Deletes a Pulsar Sink currently running in cluster 
mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function doesn't exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "The function was successfully 
deleted")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    public Response deregisterSink(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String 
namespace,
+                                   final @PathParam("sinkName") String 
sinkName) {
+        return functions.deregisterFunction(tenant, namespace, sinkName, 
clientAppId());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about a Pulsar Sink currently running 
in cluster mode",
+            response = FunctionMetaData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    public Response getSinkInfo(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace,
+                                final @PathParam("sinkName") String sinkName) 
throws IOException {
+        return functions.getFunctionInfo(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Sink instance",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
+    public Response getSinkInstanceStatus(final @PathParam("tenant") String 
tenant,
+                                          final @PathParam("namespace") String 
namespace,
+                                          final @PathParam("sinkName") String 
sinkName,
+                                          final @PathParam("instanceId") 
String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Sink running in cluster 
mode",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{sinkName}/status")
+    public Response getSinkStatus(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String 
namespace,
+                                  final @PathParam("sinkName") String 
sinkName) throws IOException {
+        return functions.getFunctionStatus(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Lists all Pulsar Sinks currently deployed in a given 
namespace",
+            response = String.class,
+            responseContainer = "Collection"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+    })
+    @Path("/{tenant}/{namespace}")
+    public Response listSinks(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(tenant, namespace);
+
+    }
+
+    @POST
+    @ApiOperation(value = "Restart sink instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sinkName") String sinkName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, sinkName, 
instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all sink instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sinkName") String sinkName) {
+        return functions.restartFunctionInstances(tenant, namespace, sinkName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop sink instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sinkName") String sinkName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, sinkName, 
instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all sink instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sinkName") String sinkName) {
+        return functions.stopFunctionInstances(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO sink connectors 
currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
+    @Path("/builtinsinks")
+    public List<ConnectorDefinition> getSinkList() {
+        List<ConnectorDefinition> connectorDefinitions = 
functions.getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
new file mode 100644
index 0000000..b4428ab
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class SourceBase extends AdminResource implements 
Supplier<WorkerService> {
+
+    private final FunctionsImpl functions;
+
+    public SourceBase() {
+        this.functions = new FunctionsImpl(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
+    }
+
+    @POST
+    @ApiOperation(value = "Creates a new Pulsar Source in cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function 
already exists, etc.)"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully 
created")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerSource(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String 
namespace,
+                                   final @PathParam("sourceName") String 
sourceName,
+                                   final @FormDataParam("data") InputStream 
uploadedInputStream,
+                                   final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("url") String 
functionPkgUrl,
+                                   final @FormDataParam("sourceConfig") String 
sourceConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, sourceConfigJson, null, 
clientAppId());
+    }
+
+    @PUT
+    @ApiOperation(value = "Updates a Pulsar Source currently running in 
cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request (function 
doesn't exist, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully 
updated")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateSource(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String 
namespace,
+                                 final @PathParam("sourceName") String 
sourceName,
+                                 final @FormDataParam("data") InputStream 
uploadedInputStream,
+                                 final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String 
functionPkgUrl,
+                                 final @FormDataParam("sourceConfig") String 
sourceConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, sourceConfigJson, null, 
clientAppId());
+
+    }
+
+
+    @DELETE
+    @ApiOperation(value = "Deletes a Pulsar Source currently running in 
cluster mode")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 404, message = "The function doesn't exist"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 200, message = "The function was successfully 
deleted")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    public Response deregisterSource(final @PathParam("tenant") String tenant,
+                                       final @PathParam("namespace") String 
namespace,
+                                       final @PathParam("sourceName") String 
sourceName) {
+        return functions.deregisterFunction(tenant, namespace, sourceName, 
clientAppId());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about a Pulsar Source currently 
running in cluster mode",
+            response = FunctionMetaData.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    public Response getSourceInfo(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String 
namespace,
+                                  final @PathParam("sourceName") String 
sourceName) throws IOException {
+        return functions.getFunctionInfo(
+            tenant, namespace, sourceName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Source instance",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 404, message = "The function doesn't exist")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
+    public Response getSourceInstanceStatus(final @PathParam("tenant") String 
tenant,
+                                            final @PathParam("namespace") 
String namespace,
+                                            final @PathParam("sourceName") 
String sourceName,
+                                            final @PathParam("instanceId") 
String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Displays the status of a Pulsar Source running in cluster 
mode",
+            response = FunctionStatus.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+    })
+    @Path("/{tenant}/{namespace}/{sourceName}/status")
+    public Response getSourceStatus(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String 
namespace,
+                                    final @PathParam("sourceName") String 
sourceName) throws IOException {
+        return functions.getFunctionStatus(tenant, namespace, sourceName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Lists all Pulsar Sources currently deployed in a given 
namespace",
+            response = String.class,
+            responseContainer = "Collection"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+    })
+    @Path("/{tenant}/{namespace}")
+    public Response listSources(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String 
namespace) {
+        return functions.listFunctions(
+            tenant, namespace);
+
+    }
+
+    @POST
+    @ApiOperation(value = "Restart source instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sourceName") String sourceName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, 
sourceName, instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all source instances", response = 
Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sourceName") String sourceName) {
+        return functions.restartFunctionInstances(tenant, namespace, 
sourceName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop source instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sourceName") String sourceName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, sourceName, 
instanceId, uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all source instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sourceName") String sourceName) {
+        return functions.stopFunctionInstances(tenant, namespace, sourceName);
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO source connectors 
currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
+    @Path("/builtinsources")
+    public List<ConnectorDefinition> getSourceList() {
+        List<ConnectorDefinition> connectorDefinitions = 
functions.getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Sink.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Sink.java
new file mode 100644
index 0000000..3d73d4d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Sink.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v1;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SinkBase;
+
+import javax.ws.rs.Path;
+
+@Path("/sink")
+@Api(value = "/sink", description = "Sink admin apis", tags = "sink", hidden = 
true)
+public class Sink extends SinkBase {
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Source.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Source.java
new file mode 100644
index 0000000..99b41dc
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Source.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v1;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SourceBase;
+
+import javax.ws.rs.Path;
+
+@Path("/source")
+@Api(value = "/source", description = "Source admin apis", tags = "source", 
hidden = true)
+public class Source extends SourceBase {
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
new file mode 100644
index 0000000..aea0ae7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SinkBase;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/sink")
+@Api(value = "/sink", description = "Sink admin apis", tags = "sink", hidden = 
true)
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Sink extends SinkBase {
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
new file mode 100644
index 0000000..e5ef56c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SourceBase;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/source")
+@Api(value = "/source", description = "Source admin apis", tags = "source", 
hidden = true)
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Source extends SourceBase {
+}
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 8ba7505..cba9ebf 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -141,7 +141,8 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
         KubernetesRuntime.doChecks(functionDetails);
     }
 
-    private void setupClient() throws Exception {
+    @VisibleForTesting
+    void setupClient() throws Exception {
         if (appsClient == null) {
             if (k8Uri == null) {
                 log.info("k8Uri is null thus going by defaults");
diff --git 
a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml 
b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
index df25367..9846f05 100644
--- a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
+++ b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
@@ -44,7 +44,7 @@ Configuration:
       name: RollingFile
       fileName: 
"${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log"
       filePattern: 
"${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
-      immediateFlush: false
+      immediateFlush: true
       PatternLayout:
         Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
       Policies:
@@ -72,7 +72,7 @@ Configuration:
       name: BkRollingFile
       fileName: 
"${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk"
       filePattern: 
"${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
-      immediateFlush: false
+      immediateFlush: true
       PatternLayout:
         Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
       Policies:
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 26c7127..ac011db 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.functions.worker.rest;
 
 import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.SinkApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.SourceApiV2Resource;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
@@ -36,6 +38,8 @@ public final class Resources {
         return new HashSet<>(
                 Arrays.asList(
                         FunctionApiV2Resource.class,
+                        SourceApiV2Resource.class,
+                        SinkApiV2Resource.class,
                         WorkerApiV2Resource.class,
                         MultiPartFeature.class
                 ));
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 3124731..0d30e37 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -27,7 +27,6 @@ import static 
org.apache.pulsar.functions.utils.Reflections.loadJar;
 import static org.apache.pulsar.functions.utils.Utils.FILE;
 import static org.apache.pulsar.functions.utils.Utils.HTTP;
 import static 
org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
-import static 
org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create;
 
 import com.google.gson.Gson;
 
@@ -39,10 +38,7 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.CopyOption;
 import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -89,9 +85,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
+import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -136,6 +130,7 @@ public class FunctionsImpl {
     public Response registerFunction(final String tenant, final String 
namespace, final String functionName,
             final InputStream uploadedInputStream, final 
FormDataContentDisposition fileDetail,
             final String functionPkgUrl, final String functionDetailsJson, 
final String functionConfigJson,
+            final String sourceConfigJson, final String sinkConfigJson,
             final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
@@ -173,10 +168,10 @@ public class FunctionsImpl {
         try {
             if (isPkgUrlProvided) {
                 functionDetails = 
validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, 
functionPkgUrl,
-                        functionDetailsJson, functionConfigJson);
+                        functionDetailsJson, functionConfigJson, 
sourceConfigJson, sinkConfigJson);
             } else {
                 functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, functionConfigJson);
+                        fileDetail, functionDetailsJson, functionConfigJson, 
sourceConfigJson, sinkConfigJson);
             }
         } catch (Exception e) {
             log.error("Invalid register function request @ /{}/{}/{}", tenant, 
namespace, functionName, e);
@@ -216,7 +211,7 @@ public class FunctionsImpl {
     public Response updateFunction(final String tenant, final String 
namespace, final String functionName,
             final InputStream uploadedInputStream, final 
FormDataContentDisposition fileDetail,
             final String functionPkgUrl, final String functionDetailsJson, 
final String functionConfigJson,
-            final String clientRole) {
+            final String sourceConfigJson, final String sinkConfigJson, final 
String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -252,10 +247,10 @@ public class FunctionsImpl {
         try {
             if (isPkgUrlProvided) {
                 functionDetails = 
validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, 
functionPkgUrl,
-                        functionDetailsJson, functionConfigJson);
+                        functionDetailsJson, functionConfigJson, 
sourceConfigJson, sinkConfigJson);
             } else {
                 functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, functionConfigJson);
+                        fileDetail, functionDetailsJson, functionConfigJson, 
sourceConfigJson, sinkConfigJson);
             }
         } catch (Exception e) {
             log.error("Invalid register function request @ /{}/{}/{}", tenant, 
namespace, functionName, e);
@@ -877,23 +872,24 @@ public class FunctionsImpl {
     }
 
     private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String 
tenant, String namespace, String functionName,
-            String functionPkgUrl, String functionDetailsJson, String 
functionConfigJson)
+            String functionPkgUrl, String functionDetailsJson, String 
functionConfigJson,
+            String sourceConfigJson, String sinkConfigJson)
             throws IllegalArgumentException, IOException, URISyntaxException {
         if (!isFunctionPackageUrlSupported(functionPkgUrl)) {
             throw new IllegalArgumentException("Function Package url is not 
valid. supported url (http/https/file)");
         }
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
-                functionDetailsJson, functionConfigJson, functionPkgUrl, null);
+                functionDetailsJson, functionConfigJson, sourceConfigJson, 
sinkConfigJson, functionPkgUrl, null);
         return functionDetails;
     }
 
     private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String functionName,
             File uploadedInputStreamAsFile, FormDataContentDisposition 
fileDetail, String functionDetailsJson,
-            String functionConfigJson)
+            String functionConfigJson, String sourceConfigJson, String 
sinkConfigJson)
             throws IllegalArgumentException, IOException, URISyntaxException {
 
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
-                functionDetailsJson, functionConfigJson, null, 
uploadedInputStreamAsFile);
+                functionDetailsJson, functionConfigJson, sourceConfigJson, 
sinkConfigJson, null, uploadedInputStreamAsFile);
         if (!isFunctionCodeBuiltin(functionDetails) && 
(uploadedInputStreamAsFile == null || fileDetail == null)) {
             throw new IllegalArgumentException("Function Package is not 
provided");
         }
@@ -966,7 +962,8 @@ public class FunctionsImpl {
     }
 
     private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String functionName,
-            String functionDetailsJson, String functionConfigJson, String 
functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, 
URISyntaxException {
+            String functionDetailsJson, String functionConfigJson, String 
sourceConfigJson,
+            String sinkConfigJson, String functionPkgUrl, File 
uploadedInputStreamAsFile) throws IOException, URISyntaxException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
@@ -977,11 +974,24 @@ public class FunctionsImpl {
             throw new IllegalArgumentException("Function Name is not 
provided");
         }
 
-        if (StringUtils.isEmpty(functionDetailsJson) && 
StringUtils.isEmpty(functionConfigJson)) {
-            throw new IllegalArgumentException("FunctionConfig is not 
provided");
+        int numDefinitions = 0;
+        if (StringUtils.isEmpty(functionDetailsJson)) {
+            numDefinitions++;
+        }
+        if (StringUtils.isEmpty(functionConfigJson)) {
+            numDefinitions++;
+        }
+        if (StringUtils.isEmpty(sourceConfigJson)) {
+            numDefinitions++;
         }
-        if (!StringUtils.isEmpty(functionDetailsJson) && 
!StringUtils.isEmpty(functionConfigJson)) {
-            throw new IllegalArgumentException("Only one of FunctionDetails or 
FunctionConfig should be provided");
+        if (StringUtils.isEmpty(sinkConfigJson)) {
+            numDefinitions++;
+        }
+        if (numDefinitions == 0) {
+            throw new IllegalArgumentException("Function Info is not 
provided");
+        }
+        if (numDefinitions > 1) {
+            throw new IllegalArgumentException("Conflicting Info provided");
         }
         if (!StringUtils.isEmpty(functionConfigJson)) {
             FunctionConfig functionConfig = new 
Gson().fromJson(functionConfigJson, FunctionConfig.class);
@@ -995,6 +1005,18 @@ public class FunctionsImpl {
             ConfigValidation.validateConfig(functionConfig, 
functionConfig.getRuntime().name(), clsLoader);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
         }
+        if (!StringUtils.isEmpty(sourceConfigJson)) {
+            SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, 
SourceConfig.class);
+            ClassLoader clsLoader = extractClassLoader(functionPkgUrl, 
uploadedInputStreamAsFile);
+            ConfigValidation.validateConfig(sourceConfig, 
FunctionConfig.Runtime.JAVA.name(), clsLoader);
+            return SourceConfigUtils.convert(sourceConfig, clsLoader);
+        }
+        if (!StringUtils.isEmpty(sinkConfigJson)) {
+            SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, 
SinkConfig.class);
+            ClassLoader clsLoader = extractClassLoader(functionPkgUrl, 
uploadedInputStreamAsFile);
+            ConfigValidation.validateConfig(sinkConfig, 
FunctionConfig.Runtime.JAVA.name(), clsLoader);
+            return SinkConfigUtils.convert(sinkConfig, clsLoader);
+        }
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
         org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, 
functionDetailsBuilder);
         if (isNotBlank(functionPkgUrl)) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 1e44a60..a842b9e 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -60,7 +60,7 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
                                      final @FormDataParam("functionConfig") 
String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, 
clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, null, 
null, clientAppId());
 
     }
 
@@ -77,7 +77,7 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
                                    final @FormDataParam("functionConfig") 
String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, 
clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, null, 
null, clientAppId());
 
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
new file mode 100644
index 0000000..175e2eb
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Path("/source")
+public class SinkApiV2Resource extends FunctionApiResource {
+
+    @POST
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerSink(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String 
namespace,
+                                 final @PathParam("sinkName") String sinkName,
+                                 final @FormDataParam("data") InputStream 
uploadedInputStream,
+                                 final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String 
sourcePkgUrl,
+                                 final @FormDataParam("sinkConfig") String 
sinkConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, sinkName, 
uploadedInputStream, fileDetail,
+                sourcePkgUrl, null, null, null, sinkConfigJson, clientAppId());
+
+    }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateSink(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("sinkName") String sinkName,
+                               final @FormDataParam("data") InputStream 
uploadedInputStream,
+                               final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
+                               final @FormDataParam("url") String 
functionPkgUrl,
+                               final @FormDataParam("sinkConfig") String 
sinkConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, sinkName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, null, sinkConfigJson, 
clientAppId());
+
+    }
+
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    public Response deregisterSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sinkName") String sinkName) {
+        return functions.deregisterFunction(tenant, namespace, sinkName, 
clientAppId());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sinkName}")
+    public Response getSinkInfo(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace,
+                                final @PathParam("sinkName") String sinkName)
+            throws IOException {
+        return functions.getFunctionInfo(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
+    public Response getSinkInstanceStatus(final @PathParam("tenant") String 
tenant,
+                                          final @PathParam("namespace") String 
namespace,
+                                          final @PathParam("sinkName") String 
sinkName,
+                                          final @PathParam("instanceId") 
String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sinkName}/status")
+    public Response getSinkStatus(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String 
namespace,
+                                  final @PathParam("sinkName") String 
sinkName) throws IOException {
+        return functions.getFunctionStatus(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}")
+    public Response listSink(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(tenant, namespace);
+
+    }
+
+    @POST
+    @ApiOperation(value = "Restart sink instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sinkName") String sinkName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, sinkName, 
instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all sink instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sinkName") String sinkName) {
+        return functions.restartFunctionInstances(tenant, namespace, sinkName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop sink instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sinkName") String sinkName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, sinkName, 
instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all sink instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sinkName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSink(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sinkName") String sinkName) {
+        return functions.stopFunctionInstances(tenant, namespace, sinkName);
+    }
+
+    @GET
+    @Path("/builtinsinks")
+    public List<ConnectorDefinition> getSinkList() {
+        List<ConnectorDefinition> connectorDefinitions = 
functions.getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
new file mode 100644
index 0000000..9d25bf9
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Path("/source")
+public class SourceApiV2Resource extends FunctionApiResource {
+
+    @POST
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerSource(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String 
namespace,
+                                     final @PathParam("sourceName") String 
sourceName,
+                                     final @FormDataParam("data") InputStream 
uploadedInputStream,
+                                     final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
+                                     final @FormDataParam("url") String 
sourcePkgUrl,
+                                     final @FormDataParam("sourceConfig") 
String sourceConfigJson) {
+
+        return functions.registerFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
+                sourcePkgUrl, null, null, sourceConfigJson, null, 
clientAppId());
+
+    }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateSource(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String 
namespace,
+                                 final @PathParam("sourceName") String 
sourceName,
+                                 final @FormDataParam("data") InputStream 
uploadedInputStream,
+                                 final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String 
functionPkgUrl,
+                                 final @FormDataParam("sourceConfig") String 
sourceConfigJson) {
+
+        return functions.updateFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, null, null, sourceConfigJson, null, 
clientAppId());
+
+    }
+
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    public Response deregisterSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sourceName") String sourceName) {
+        return functions.deregisterFunction(tenant, namespace, sourceName, 
clientAppId());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sourceName}")
+    public Response getSourceInfo(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String 
namespace,
+                                  final @PathParam("sourceName") String 
sourceName)
+            throws IOException {
+        return functions.getFunctionInfo(tenant, namespace, sourceName);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
+    public Response getSourceInstanceStatus(final @PathParam("tenant") String 
tenant,
+                                            final @PathParam("namespace") 
String namespace,
+                                            final @PathParam("sourceName") 
String sourceName,
+                                            final @PathParam("instanceId") 
String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{sourceName}/status")
+    public Response getSourceStatus(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String 
namespace,
+                                    final @PathParam("sourceName") String 
sourceName) throws IOException {
+        return functions.getFunctionStatus(tenant, namespace, sourceName);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}")
+    public Response listSources(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String 
namespace) {
+        return functions.listFunctions(tenant, namespace);
+
+    }
+
+    @POST
+    @ApiOperation(value = "Restart source instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sourceName") String sourceName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.restartFunctionInstance(tenant, namespace, 
sourceName, instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Restart all source instances", response = 
Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/restart")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response restartSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sourceName") String sourceName) {
+        return functions.restartFunctionInstances(tenant, namespace, 
sourceName);
+    }
+
+    @POST
+    @ApiOperation(value = "Stop source instance", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sourceName") String sourceName,
+            final @PathParam("instanceId") String instanceId) {
+        return functions.stopFunctionInstance(tenant, namespace, sourceName, 
instanceId, this.uri.getRequestUri());
+    }
+
+    @POST
+    @ApiOperation(value = "Stop all source instances", response = Void.class)
+    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    @Path("/{tenant}/{namespace}/{sourceName}/stop")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response stopSource(final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace, final 
@PathParam("sourceName") String sourceName) {
+        return functions.stopFunctionInstances(tenant, namespace, sourceName);
+    }
+
+    @GET
+    @Path("/builtinsources")
+    public List<ConnectorDefinition> getSourceList() {
+        List<ConnectorDefinition> connectorDefinitions = 
functions.getListOfConnectors();
+        List<ConnectorDefinition> retval = new ArrayList<>();
+        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+            if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
+                retval.add(connectorDefinition);
+            }
+        }
+        return retval;
+    }
+}

Reply via email to