This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4334d628fbffebc39d6d162911e1c9532c6fea83
Author: Sanjeev Kulkarni <sanj...@streaml.io>
AuthorDate: Mon Oct 8 09:32:34 2018 -0700

    Made pulsar admin to use Source/Sink endpoints
---
 pulsar-client-admin/pom.xml                        |   6 +
 .../apache/pulsar/client/admin/PulsarAdmin.java    |  36 +--
 .../java/org/apache/pulsar/client/admin/Sink.java  | 277 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Source.java     | 277 +++++++++++++++++++++
 .../pulsar/client/admin/internal/SinkImpl.java     | 270 ++++++++++++++++++++
 .../pulsar/client/admin/internal/SourceImpl.java   | 270 ++++++++++++++++++++
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  24 +-
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  24 +-
 8 files changed, 1147 insertions(+), 37 deletions(-)

diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index b42ea2c..ac39524 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -46,6 +46,12 @@
       <version>${project.version}</version>
     </dependency>
 
+      <dependency>
+          <groupId>${project.groupId}</groupId>
+          <artifactId>pulsar-functions-utils</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+
     <dependency>
       <groupId>org.glassfish.jersey.core</groupId>
       <artifactId>jersey-client</artifactId>
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index a1502b0..0953d22 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -32,21 +32,7 @@ import javax.ws.rs.client.WebTarget;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.pulsar.client.admin.internal.BookiesImpl;
-import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
-import org.apache.pulsar.client.admin.internal.BrokersImpl;
-import org.apache.pulsar.client.admin.internal.ClustersImpl;
-import org.apache.pulsar.client.admin.internal.FunctionsImpl;
-import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
-import org.apache.pulsar.client.admin.internal.LookupImpl;
-import org.apache.pulsar.client.admin.internal.NamespacesImpl;
-import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
-import org.apache.pulsar.client.admin.internal.SchemasImpl;
-import org.apache.pulsar.client.admin.internal.TopicsImpl;
-import org.apache.pulsar.client.admin.internal.WorkerImpl;
-import org.apache.pulsar.client.admin.internal.TenantsImpl;
-import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
-import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
+import org.apache.pulsar.client.admin.internal.*;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -85,6 +71,8 @@ public class PulsarAdmin implements Closeable {
     private final String serviceUrl;
     private final Lookup lookups;
     private final Functions functions;
+    private final Source source;
+    private final Sink sink;
     private final Worker worker;
     private final Schemas schemas;
     protected final WebTarget root;
@@ -189,6 +177,8 @@ public class PulsarAdmin implements Closeable {
         this.resourceQuotas = new ResourceQuotasImpl(root, auth);
         this.lookups = new LookupImpl(root, auth, useTls);
         this.functions = new FunctionsImpl(root, auth);
+        this.source = new SourceImpl(root, auth);
+        this.sink = new SinkImpl(root, auth);
         this.worker = new WorkerImpl(root, auth);
         this.schemas = new SchemasImpl(root, auth);
         this.bookies = new BookiesImpl(root, auth);
@@ -358,6 +348,22 @@ public class PulsarAdmin implements Closeable {
     }
 
     /**
+     *
+     * @return the source management object
+     */
+    public Source source() {
+        return source;
+    }
+
+    /**
+     *
+     * @return the sink management object
+     */
+    public Sink sink() {
+        return sink;
+    }
+
+    /**
     *
     * @return the Worker stats
     */
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
new file mode 100644
index 0000000..3f8fe2f
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SinkConfig;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Admin interface for Sink management.
+ */
+public interface Sink {
+    /**
+     * Get the list of sinks.
+     * <p>
+     * Get the list of all the Pulsar Sinks.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["f1", "f2", "f3"]</code>
+     * </pre>
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<String> getSinks(String tenant, String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Get the configuration for the specified sink.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ serviceUrl : "http://my-broker.example.com:8080/"; }</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @return the sink configuration
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of 
the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Function.FunctionDetails getSink(String tenant, String namespace, String 
sink) throws PulsarAdminException;
+
+    /**
+     * Create a new sink.
+     *
+     * @param sinkConfig
+     *            the sink configuration object
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createSink(SinkConfig sinkConfig, String fileName) throws 
PulsarAdminException;
+
+    /**
+     * <pre>
+     * Create a new sink by providing url from which fun-pkg can be 
downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * </pre>
+     *
+     * @param sinkConfig
+     *            the sink configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     * @throws PulsarAdminException
+     */
+    void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws 
PulsarAdminException;
+
+    /**
+     * Update the configuration for a sink.
+     * <p>
+     *
+     * @param sinkConfig
+     *            the sink configuration object
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateSink(SinkConfig sinkConfig, String fileName) throws 
PulsarAdminException;
+
+    /**
+     * Update the configuration for a sink.
+     * <pre>
+     * Update a sink by providing url from which fun-pkg can be downloaded. 
supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * </pre>
+     *
+     * @param sinkConfig
+     *            the sink configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws 
PulsarAdminException;
+
+    /**
+     * Delete an existing sink
+     * <p>
+     * Delete a sink
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Cluster does not exist
+     * @throws PreconditionFailedException
+     *             Cluster is not empty
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void deleteSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
+
+    /**
+     * Gets the current status of a sink.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    FunctionStatusList getSinkStatus(String tenant, String namespace, String 
sink) throws PulsarAdminException;
+
+    /**
+     * Gets the current status of a sink instance.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     * @param id
+     *            Sink instance-id
+     * @return
+     * @throws PulsarAdminException
+     */
+    FunctionStatus getSinkStatus(String tenant, String namespace, String sink, 
int id)
+            throws PulsarAdminException;
+
+    /**
+     * Restart sink instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @param instanceId
+     *            Sink instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartSink(String tenant, String namespace, String sink, int 
instanceId) throws PulsarAdminException;
+
+    /**
+     * Restart all sink instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
+
+
+    /**
+     * Stop sink instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @param instanceId
+     *            Sink instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopSink(String tenant, String namespace, String sink, int 
instanceId) throws PulsarAdminException;
+
+    /**
+     * Stop all sink instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param sink
+     *            Sink name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
+
+    /**
+     * Fetches a list of supported Pulsar IO sinks currently running in 
cluster mode
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     *
+     */
+    List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException;
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
new file mode 100644
index 0000000..3c43cf2
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SourceConfig;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Admin interface for Source management.
+ */
+public interface Source {
+    /**
+     * Get the list of sources.
+     * <p>
+     * Get the list of all the Pulsar Sources.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["f1", "f2", "f3"]</code>
+     * </pre>
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<String> getSources(String tenant, String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Get the configuration for the specified source.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ serviceUrl : "http://my-broker.example.com:8080/"; }</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @return the source configuration
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of 
the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Function.FunctionDetails getSource(String tenant, String namespace, String 
source) throws PulsarAdminException;
+
+    /**
+     * Create a new source.
+     *
+     * @param sourceConfig
+     *            the source configuration object
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createSource(SourceConfig sourceConfig, String fileName) throws 
PulsarAdminException;
+
+    /**
+     * <pre>
+     * Create a new source by providing url from which fun-pkg can be 
downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * </pre>
+     *
+     * @param sourceConfig
+     *            the source configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     * @throws PulsarAdminException
+     */
+    void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws 
PulsarAdminException;
+
+    /**
+     * Update the configuration for a source.
+     * <p>
+     *
+     * @param sourceConfig
+     *            the source configuration object
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateSource(SourceConfig sourceConfig, String fileName) throws 
PulsarAdminException;
+
+    /**
+     * Update the configuration for a source.
+     * <pre>
+     * Update a source by providing url from which fun-pkg can be downloaded. 
supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     * </pre>
+     *
+     * @param sourceConfig
+     *            the source configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws 
PulsarAdminException;
+
+    /**
+     * Delete an existing source
+     * <p>
+     * Delete a source
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Cluster does not exist
+     * @throws PreconditionFailedException
+     *             Cluster is not empty
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void deleteSource(String tenant, String namespace, String source) throws 
PulsarAdminException;
+
+    /**
+     * Gets the current status of a source.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    FunctionStatusList getSourceStatus(String tenant, String namespace, String 
source) throws PulsarAdminException;
+
+    /**
+     * Gets the current status of a source instance.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     * @param id
+     *            Source instance-id
+     * @return
+     * @throws PulsarAdminException
+     */
+    FunctionStatus getSourceStatus(String tenant, String namespace, String 
source, int id)
+            throws PulsarAdminException;
+
+    /**
+     * Restart source instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @param instanceId
+     *            Source instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartSource(String tenant, String namespace, String source, int 
instanceId) throws PulsarAdminException;
+
+    /**
+     * Restart all source instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void restartSource(String tenant, String namespace, String source) throws 
PulsarAdminException;
+
+
+    /**
+     * Stop source instance
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @param instanceId
+     *            Source instanceId
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopSource(String tenant, String namespace, String source, int 
instanceId) throws PulsarAdminException;
+
+    /**
+     * Stop all source instances
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param source
+     *            Source name
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void stopSource(String tenant, String namespace, String source) throws 
PulsarAdminException;
+
+    /**
+     * Fetches a list of supported Pulsar IO sources currently running in 
cluster mode
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     *
+     */
+    List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException;
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
new file mode 100644
index 0000000..b74f4ff
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import com.google.gson.Gson;
+import com.google.protobuf.AbstractMessage.Builder;
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Sink;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SinkConfig;
+import org.glassfish.jersey.media.multipart.FormDataBodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SinkImpl extends BaseResource implements Sink {
+
+    private final WebTarget sink;
+
+    public SinkImpl(WebTarget web, Authentication auth) {
+        super(auth);
+        this.sink = web.path("/admin/sink");
+    }
+
+    @Override
+    public List<String> getSinks(String tenant, String namespace) throws 
PulsarAdminException {
+        try {
+            Response response = 
request(sink.path(tenant).path(namespace)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new GenericType<List<String>>() {
+            });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionDetails getSink(String tenant, String namespace, String 
sinkName) throws PulsarAdminException {
+        try {
+             Response response = 
request(sink.path(tenant).path(namespace).path(sinkName)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+            mergeJson(jsonResponse, functionDetailsBuilder);
+            return functionDetailsBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionStatusList getSinkStatus(
+            String tenant, String namespace, String sinkName) throws 
PulsarAdminException {
+        try {
+            Response response = 
request(sink.path(tenant).path(namespace).path(sinkName).path("status")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionStatusList.Builder functionStatusBuilder = 
FunctionStatusList.newBuilder();
+            mergeJson(jsonResponse, functionStatusBuilder);
+            return functionStatusBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionStatus getSinkStatus(
+            String tenant, String namespace, String sinkName, int id) throws 
PulsarAdminException {
+        try {
+            Response response = request(
+                    
sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status"))
+                            .get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionStatus.Builder functionStatusBuilder = 
FunctionStatus.newBuilder();
+            mergeJson(jsonResponse, functionStatusBuilder);
+            return functionStatusBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void createSink(SinkConfig sinkConfig, String fileName) throws 
PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit 
here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
+
+            mp.bodyPart(new FormDataBodyPart("sinkConfig",
+                new Gson().toJson(sinkConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            
request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
+                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws 
PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            mp.bodyPart(new FormDataBodyPart("url", pkgUrl, 
MediaType.TEXT_PLAIN_TYPE));
+
+            mp.bodyPart(new FormDataBodyPart("sinkConfig",
+                    new Gson().toJson(sinkConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            
request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
+                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void deleteSink(String cluster, String namespace, String function) 
throws PulsarAdminException {
+        try {
+            request(sink.path(cluster).path(namespace).path(function))
+                    .delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void updateSink(SinkConfig sinkConfig, String fileName) throws 
PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit 
here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
+
+            mp.bodyPart(new FormDataBodyPart("sinkConfig",
+                    new Gson().toJson(sinkConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            
request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
+                    .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws 
PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            mp.bodyPart(new FormDataBodyPart("url", pkgUrl, 
MediaType.TEXT_PLAIN_TYPE));
+
+            mp.bodyPart(new FormDataBodyPart("sinkConfig", new 
Gson().toJson(sinkConfig),
+                    MediaType.APPLICATION_JSON_TYPE));
+            
request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace())
+                    .path(sinkConfig.getName())).put(Entity.entity(mp, 
MediaType.MULTIPART_FORM_DATA),
+                            ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartSink(String tenant, String namespace, String 
functionName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(sink.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+                    .path("restart")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartSink(String tenant, String namespace, String 
functionName) throws PulsarAdminException {
+        try {
+            
request(sink.path(tenant).path(namespace).path(functionName).path("restart"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void stopSink(String tenant, String namespace, String sinkName, int 
instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId))
+                    .path("stop")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void stopSink(String tenant, String namespace, String sinkName) 
throws PulsarAdminException {
+        try {
+            
request(sink.path(tenant).path(namespace).path(sinkName).path("stop"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public List<ConnectorDefinition> getBuiltInSinks() throws 
PulsarAdminException {
+        try {
+            Response response = request(sink.path("builtinsink")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new 
GenericType<List<ConnectorDefinition>>() {});
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+
+    public static void mergeJson(String json, Builder builder) throws 
IOException {
+        JsonFormat.parser().merge(json, builder);
+    }
+
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
new file mode 100644
index 0000000..3ae0a6f
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import com.google.gson.Gson;
+import com.google.protobuf.AbstractMessage.Builder;
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Source;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SourceConfig;
+import org.glassfish.jersey.media.multipart.FormDataBodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SourceImpl extends BaseResource implements Source {
+
+    private final WebTarget source;
+
+    public SourceImpl(WebTarget web, Authentication auth) {
+        super(auth);
+        this.source = web.path("/admin/source");
+    }
+
+    @Override
+    public List<String> getSources(String tenant, String namespace) throws 
PulsarAdminException {
+        try {
+            Response response = 
request(source.path(tenant).path(namespace)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new GenericType<List<String>>() {
+            });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionDetails getSource(String tenant, String namespace, String 
sourceName) throws PulsarAdminException {
+        try {
+             Response response = 
request(source.path(tenant).path(namespace).path(sourceName)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+            mergeJson(jsonResponse, functionDetailsBuilder);
+            return functionDetailsBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionStatusList getSourceStatus(
+            String tenant, String namespace, String sourceName) throws 
PulsarAdminException {
+        try {
+            Response response = 
request(source.path(tenant).path(namespace).path(sourceName).path("status")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionStatusList.Builder functionStatusBuilder = 
FunctionStatusList.newBuilder();
+            mergeJson(jsonResponse, functionStatusBuilder);
+            return functionStatusBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FunctionStatus getSourceStatus(
+            String tenant, String namespace, String sourceName, int id) throws 
PulsarAdminException {
+        try {
+            Response response = request(
+                    
source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(id)).path("status"))
+                            .get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            FunctionStatus.Builder functionStatusBuilder = 
FunctionStatus.newBuilder();
+            mergeJson(jsonResponse, functionStatusBuilder);
+            return functionStatusBuilder.build();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void createSource(SourceConfig sourceConfig, String fileName) 
throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit 
here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
+
+            mp.bodyPart(new FormDataBodyPart("sourceConfig",
+                new Gson().toJson(sourceConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            
request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
+                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) 
throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            mp.bodyPart(new FormDataBodyPart("url", pkgUrl, 
MediaType.TEXT_PLAIN_TYPE));
+
+            mp.bodyPart(new FormDataBodyPart("sourceConfig",
+                    new Gson().toJson(sourceConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            
request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
+                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void deleteSource(String cluster, String namespace, String 
function) throws PulsarAdminException {
+        try {
+            request(source.path(cluster).path(namespace).path(function))
+                    .delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void updateSource(SourceConfig sourceConfig, String fileName) 
throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit 
here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
+
+            mp.bodyPart(new FormDataBodyPart("sourceConfig",
+                    new Gson().toJson(sourceConfig),
+                MediaType.APPLICATION_JSON_TYPE));
+            
request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
+                    .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) 
throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+
+            mp.bodyPart(new FormDataBodyPart("url", pkgUrl, 
MediaType.TEXT_PLAIN_TYPE));
+
+            mp.bodyPart(new FormDataBodyPart("sourceConfig", new 
Gson().toJson(sourceConfig),
+                    MediaType.APPLICATION_JSON_TYPE));
+            
request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace())
+                    .path(sourceConfig.getName())).put(Entity.entity(mp, 
MediaType.MULTIPART_FORM_DATA),
+                            ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartSource(String tenant, String namespace, String 
functionName, int instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(source.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+                    .path("restart")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void restartSource(String tenant, String namespace, String 
functionName) throws PulsarAdminException {
+        try {
+            
request(source.path(tenant).path(namespace).path(functionName).path("restart"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void stopSource(String tenant, String namespace, String sourceName, 
int instanceId)
+            throws PulsarAdminException {
+        try {
+            
request(source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(instanceId))
+                    .path("stop")).post(Entity.entity("", 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void stopSource(String tenant, String namespace, String sourceName) 
throws PulsarAdminException {
+        try {
+            
request(source.path(tenant).path(namespace).path(sourceName).path("stop"))
+                    .post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public List<ConnectorDefinition> getBuiltInSources() throws 
PulsarAdminException {
+        try {
+            Response response = request(source.path("builtinsource")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(new 
GenericType<List<ConnectorDefinition>>() {});
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+
+    public static void mergeJson(String json, Builder builder) throws 
IOException {
+        JsonFormat.parser().merge(json, builder);
+    }
+
+}
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index bd5af79..e56a343 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -79,7 +80,7 @@ public class CmdSinks extends CmdBase {
         jcommander.addCommand("update", updateSink);
         jcommander.addCommand("delete", deleteSink);
         jcommander.addCommand("localrun", localSinkRunner);
-        jcommander.addCommand("available-sinks", new ListSinks());
+        jcommander.addCommand("available-sinks", new ListBuiltInSinks());
     }
 
     /**
@@ -187,9 +188,9 @@ public class CmdSinks extends CmdBase {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
-                
admin.functions().createFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), 
sinkConfig.getArchive());
+                admin.sink().createSinkWithUrl(sinkConfig, 
sinkConfig.getArchive());
             } else {
-                
admin.functions().createFunction(SinkConfigUtils.convert(sinkConfig), 
sinkConfig.getArchive());
+                admin.sink().createSink(sinkConfig, sinkConfig.getArchive());
             }
             print("Created successfully");
         }
@@ -200,9 +201,9 @@ public class CmdSinks extends CmdBase {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
-                
admin.functions().updateFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), 
sinkConfig.getArchive());
+                admin.sink().updateSinkWithUrl(sinkConfig, 
sinkConfig.getArchive());
             } else {
-                
admin.functions().updateFunction(SinkConfigUtils.convert(sinkConfig), 
sinkConfig.getArchive());
+                admin.sink().updateSink(sinkConfig, sinkConfig.getArchive());
             }
             print("Updated successfully");
         }
@@ -282,6 +283,8 @@ public class CmdSinks extends CmdBase {
 
         protected SinkConfig sinkConfig;
 
+        protected NarClassLoader classLoader;
+
         private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = 
DEPRECATED_subsName;
             if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern 
= DEPRECATED_topicsPattern;
@@ -449,7 +452,6 @@ public class CmdSinks extends CmdBase {
             }
 
             // if jar file is present locally then load jar and validate 
SinkClass in it
-            ClassLoader classLoader = null;
             if (archivePath != null) {
                 if (!fileExists(archivePath)) {
                     throw new ParameterException("Archive file " + archivePath 
+ " does not exist");
@@ -482,14 +484,14 @@ public class CmdSinks extends CmdBase {
                 throws IOException {
             org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder 
functionDetailsBuilder
                     = 
org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            
Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig)), 
functionDetailsBuilder);
+            
Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig, 
classLoader)), functionDetailsBuilder);
             return functionDetailsBuilder.build();
         }
 
         protected String validateSinkType(String sinkType) throws IOException {
             Set<String> availableSinks;
             try {
-                availableSinks = admin.functions().getSinks();
+                availableSinks = 
admin.sink().getBuiltInSinks().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
             } catch (PulsarAdminException e) {
                 throw new IOException(e);
             }
@@ -533,16 +535,16 @@ public class CmdSinks extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            admin.functions().deleteFunction(tenant, namespace, name);
+            admin.sink().deleteSink(tenant, namespace, name);
             print("Deleted successfully");
         }
     }
 
     @Parameters(commandDescription = "Get the list of Pulsar IO connector 
sinks supported by Pulsar cluster")
-    public class ListSinks extends BaseCommand {
+    public class ListBuiltInSinks extends BaseCommand {
         @Override
         void runCmd() throws Exception {
-            admin.functions().getConnectorsList().stream().filter(x -> 
isNotBlank(x.getSinkClass()))
+            admin.sink().getBuiltInSinks().stream().filter(x -> 
isNotBlank(x.getSinkClass()))
                     .forEach(connector -> {
                         System.out.println(connector.getName());
                         
System.out.println(WordUtils.wrap(connector.getDescription(), 80));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 5d0c84c..f2d7681 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -37,6 +37,7 @@ import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -79,7 +80,7 @@ public class CmdSources extends CmdBase {
         jcommander.addCommand("update", updateSource);
         jcommander.addCommand("delete", deleteSource);
         jcommander.addCommand("localrun", localSourceRunner);
-        jcommander.addCommand("available-sources", new ListSources());
+        jcommander.addCommand("available-sources", new ListBuiltInSources());
     }
 
     /**
@@ -187,9 +188,9 @@ public class CmdSources extends CmdBase {
         @Override
         void runCmd() throws Exception {
             if 
(Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) {
-                
admin.functions().createFunctionWithUrl(SourceConfigUtils.convert(sourceConfig),
 sourceConfig.getArchive());
+                admin.source().createSourceWithUrl(sourceConfig, 
sourceConfig.getArchive());
             } else {
-                
admin.functions().createFunction(SourceConfigUtils.convert(sourceConfig), 
sourceConfig.getArchive());
+                admin.source().createSource(sourceConfig, 
sourceConfig.getArchive());
             }
             print("Created successfully");
         }
@@ -200,9 +201,9 @@ public class CmdSources extends CmdBase {
         @Override
         void runCmd() throws Exception {
             if 
(Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
-                
admin.functions().updateFunctionWithUrl(SourceConfigUtils.convert(sourceConfig),
 sourceConfig.getArchive());
+                admin.source().updateSourceWithUrl(sourceConfig, 
sourceConfig.getArchive());
             } else {
-                
admin.functions().updateFunction(SourceConfigUtils.convert(sourceConfig), 
sourceConfig.getArchive());
+                admin.source().updateSource(sourceConfig, 
sourceConfig.getArchive());
             }
             print("Updated successfully");
         }
@@ -267,6 +268,8 @@ public class CmdSources extends CmdBase {
 
         protected SourceConfig sourceConfig;
 
+        protected NarClassLoader classLoader;
+
         private void mergeArgs() {
             if (DEPRECATED_processingGuarantees != null) processingGuarantees 
= DEPRECATED_processingGuarantees;
             if (!StringUtils.isBlank(DEPRECATED_destinationTopicName)) 
destinationTopicName = DEPRECATED_destinationTopicName;
@@ -403,7 +406,6 @@ public class CmdSources extends CmdBase {
 
 
             // if jar file is present locally then load jar and validate 
SinkClass in it
-            ClassLoader classLoader = null;
             if (archivePath != null) {
                 if (!fileExists(archivePath)) {
                     throw new ParameterException("Archive file " + archivePath 
+ " does not exist");
@@ -436,14 +438,14 @@ public class CmdSources extends CmdBase {
                 throws IOException {
             org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder 
functionDetailsBuilder
                     = 
org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            
Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig)),
 functionDetailsBuilder);
+            
Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig, 
classLoader)), functionDetailsBuilder);
             return functionDetailsBuilder.build();
         }
 
         protected String validateSourceType(String sourceType) throws 
IOException {
             Set<String> availableSources;
             try {
-                availableSources = admin.functions().getSources();
+                availableSources = 
admin.source().getBuiltInSources().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
             } catch (PulsarAdminException e) {
                 throw new IOException(e);
             }
@@ -487,16 +489,16 @@ public class CmdSources extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            admin.functions().deleteFunction(tenant, namespace, name);
+            admin.source().deleteSource(tenant, namespace, name);
             print("Delete source successfully");
         }
     }
 
     @Parameters(commandDescription = "Get the list of Pulsar IO connector 
sources supported by Pulsar cluster")
-    public class ListSources extends BaseCommand {
+    public class ListBuiltInSources extends BaseCommand {
         @Override
         void runCmd() throws Exception {
-            admin.functions().getConnectorsList().stream().filter(x -> 
!StringUtils.isEmpty(x.getSourceClass()))
+            admin.source().getBuiltInSources().stream().filter(x -> 
!StringUtils.isEmpty(x.getSourceClass()))
                     .forEach(connector -> {
                         System.out.println(connector.getName());
                         
System.out.println(WordUtils.wrap(connector.getDescription(), 80));

Reply via email to