This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4334d628fbffebc39d6d162911e1c9532c6fea83 Author: Sanjeev Kulkarni <sanj...@streaml.io> AuthorDate: Mon Oct 8 09:32:34 2018 -0700 Made pulsar admin to use Source/Sink endpoints --- pulsar-client-admin/pom.xml | 6 + .../apache/pulsar/client/admin/PulsarAdmin.java | 36 +-- .../java/org/apache/pulsar/client/admin/Sink.java | 277 +++++++++++++++++++++ .../org/apache/pulsar/client/admin/Source.java | 277 +++++++++++++++++++++ .../pulsar/client/admin/internal/SinkImpl.java | 270 ++++++++++++++++++++ .../pulsar/client/admin/internal/SourceImpl.java | 270 ++++++++++++++++++++ .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 24 +- .../org/apache/pulsar/admin/cli/CmdSources.java | 24 +- 8 files changed, 1147 insertions(+), 37 deletions(-) diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml index b42ea2c..ac39524 100644 --- a/pulsar-client-admin/pom.xml +++ b/pulsar-client-admin/pom.xml @@ -46,6 +46,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-functions-utils</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.glassfish.jersey.core</groupId> <artifactId>jersey-client</artifactId> diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java index a1502b0..0953d22 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java @@ -32,21 +32,7 @@ import javax.ws.rs.client.WebTarget; import org.apache.commons.lang3.StringUtils; import org.apache.http.conn.ssl.DefaultHostnameVerifier; import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.pulsar.client.admin.internal.BookiesImpl; -import org.apache.pulsar.client.admin.internal.BrokerStatsImpl; -import org.apache.pulsar.client.admin.internal.BrokersImpl; -import org.apache.pulsar.client.admin.internal.ClustersImpl; -import org.apache.pulsar.client.admin.internal.FunctionsImpl; -import org.apache.pulsar.client.admin.internal.JacksonConfigurator; -import org.apache.pulsar.client.admin.internal.LookupImpl; -import org.apache.pulsar.client.admin.internal.NamespacesImpl; -import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl; -import org.apache.pulsar.client.admin.internal.SchemasImpl; -import org.apache.pulsar.client.admin.internal.TopicsImpl; -import org.apache.pulsar.client.admin.internal.WorkerImpl; -import org.apache.pulsar.client.admin.internal.TenantsImpl; -import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; -import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl; +import org.apache.pulsar.client.admin.internal.*; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.AuthenticationFactory; @@ -85,6 +71,8 @@ public class PulsarAdmin implements Closeable { private final String serviceUrl; private final Lookup lookups; private final Functions functions; + private final Source source; + private final Sink sink; private final Worker worker; private final Schemas schemas; protected final WebTarget root; @@ -189,6 +177,8 @@ public class PulsarAdmin implements Closeable { this.resourceQuotas = new ResourceQuotasImpl(root, auth); this.lookups = new LookupImpl(root, auth, useTls); this.functions = new FunctionsImpl(root, auth); + this.source = new SourceImpl(root, auth); + this.sink = new SinkImpl(root, auth); this.worker = new WorkerImpl(root, auth); this.schemas = new SchemasImpl(root, auth); this.bookies = new BookiesImpl(root, auth); @@ -358,6 +348,22 @@ public class PulsarAdmin implements Closeable { } /** + * + * @return the source management object + */ + public Source source() { + return source; + } + + /** + * + * @return the sink management object + */ + public Sink sink() { + return sink; + } + + /** * * @return the Worker stats */ diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java new file mode 100644 index 0000000..3f8fe2f --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin; + +import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; +import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; +import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.utils.SinkConfig; + +import java.util.List; +import java.util.Set; + +/** + * Admin interface for Sink management. + */ +public interface Sink { + /** + * Get the list of sinks. + * <p> + * Get the list of all the Pulsar Sinks. + * <p> + * Response Example: + * + * <pre> + * <code>["f1", "f2", "f3"]</code> + * </pre> + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws PulsarAdminException + * Unexpected error + */ + List<String> getSinks(String tenant, String namespace) throws PulsarAdminException; + + /** + * Get the configuration for the specified sink. + * <p> + * Response Example: + * + * <pre> + * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code> + * </pre> + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @return the sink configuration + * + * @throws NotAuthorizedException + * You don't have admin permission to get the configuration of the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + Function.FunctionDetails getSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + /** + * Create a new sink. + * + * @param sinkConfig + * the sink configuration object + * + * @throws PulsarAdminException + * Unexpected error + */ + void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException; + + /** + * <pre> + * Create a new sink by providing url from which fun-pkg can be downloaded. supported url: http/file + * eg: + * File: file:/dir/fileName.jar + * Http: http://www.repo.com/fileName.jar + * </pre> + * + * @param sinkConfig + * the sink configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @throws PulsarAdminException + */ + void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException; + + /** + * Update the configuration for a sink. + * <p> + * + * @param sinkConfig + * the sink configuration object + * + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException; + + /** + * Update the configuration for a sink. + * <pre> + * Update a sink by providing url from which fun-pkg can be downloaded. supported url: http/file + * eg: + * File: file:/dir/fileName.jar + * Http: http://www.repo.com/fileName.jar + * </pre> + * + * @param sinkConfig + * the sink configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException; + + /** + * Delete an existing sink + * <p> + * Delete a sink + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws NotAuthorizedException + * You don't have admin permission + * @throws NotFoundException + * Cluster does not exist + * @throws PreconditionFailedException + * Cluster is not empty + * @throws PulsarAdminException + * Unexpected error + */ + void deleteSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + /** + * Gets the current status of a sink. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws PulsarAdminException + * Unexpected error + */ + FunctionStatusList getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException; + + /** + * Gets the current status of a sink instance. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * @param id + * Sink instance-id + * @return + * @throws PulsarAdminException + */ + FunctionStatus getSinkStatus(String tenant, String namespace, String sink, int id) + throws PulsarAdminException; + + /** + * Restart sink instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @param instanceId + * Sink instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException; + + /** + * Restart all sink instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + + /** + * Stop sink instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @param instanceId + * Sink instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException; + + /** + * Stop all sink instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param sink + * Sink name + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException; + + /** + * Fetches a list of supported Pulsar IO sinks currently running in cluster mode + * + * @throws PulsarAdminException + * Unexpected error + * + */ + List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException; +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java new file mode 100644 index 0000000..3c43cf2 --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin; + +import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; +import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; +import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.utils.SourceConfig; + +import java.util.List; +import java.util.Set; + +/** + * Admin interface for Source management. + */ +public interface Source { + /** + * Get the list of sources. + * <p> + * Get the list of all the Pulsar Sources. + * <p> + * Response Example: + * + * <pre> + * <code>["f1", "f2", "f3"]</code> + * </pre> + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws PulsarAdminException + * Unexpected error + */ + List<String> getSources(String tenant, String namespace) throws PulsarAdminException; + + /** + * Get the configuration for the specified source. + * <p> + * Response Example: + * + * <pre> + * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code> + * </pre> + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @return the source configuration + * + * @throws NotAuthorizedException + * You don't have admin permission to get the configuration of the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + Function.FunctionDetails getSource(String tenant, String namespace, String source) throws PulsarAdminException; + + /** + * Create a new source. + * + * @param sourceConfig + * the source configuration object + * + * @throws PulsarAdminException + * Unexpected error + */ + void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException; + + /** + * <pre> + * Create a new source by providing url from which fun-pkg can be downloaded. supported url: http/file + * eg: + * File: file:/dir/fileName.jar + * Http: http://www.repo.com/fileName.jar + * </pre> + * + * @param sourceConfig + * the source configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @throws PulsarAdminException + */ + void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException; + + /** + * Update the configuration for a source. + * <p> + * + * @param sourceConfig + * the source configuration object + * + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException; + + /** + * Update the configuration for a source. + * <pre> + * Update a source by providing url from which fun-pkg can be downloaded. supported url: http/file + * eg: + * File: file:/dir/fileName.jar + * Http: http://www.repo.com/fileName.jar + * </pre> + * + * @param sourceConfig + * the source configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException; + + /** + * Delete an existing source + * <p> + * Delete a source + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws NotAuthorizedException + * You don't have admin permission + * @throws NotFoundException + * Cluster does not exist + * @throws PreconditionFailedException + * Cluster is not empty + * @throws PulsarAdminException + * Unexpected error + */ + void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException; + + /** + * Gets the current status of a source. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws PulsarAdminException + * Unexpected error + */ + FunctionStatusList getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException; + + /** + * Gets the current status of a source instance. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * @param id + * Source instance-id + * @return + * @throws PulsarAdminException + */ + FunctionStatus getSourceStatus(String tenant, String namespace, String source, int id) + throws PulsarAdminException; + + /** + * Restart source instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @param instanceId + * Source instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException; + + /** + * Restart all source instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartSource(String tenant, String namespace, String source) throws PulsarAdminException; + + + /** + * Stop source instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @param instanceId + * Source instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException; + + /** + * Stop all source instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param source + * Source name + * + * @throws PulsarAdminException + * Unexpected error + */ + void stopSource(String tenant, String namespace, String source) throws PulsarAdminException; + + /** + * Fetches a list of supported Pulsar IO sources currently running in cluster mode + * + * @throws PulsarAdminException + * Unexpected error + * + */ + List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException; +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java new file mode 100644 index 0000000..b74f4ff --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin.internal; + +import com.google.gson.Gson; +import com.google.protobuf.AbstractMessage.Builder; +import com.google.protobuf.util.JsonFormat; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Sink; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.utils.SinkConfig; +import org.glassfish.jersey.media.multipart.FormDataBodyPart; +import org.glassfish.jersey.media.multipart.FormDataMultiPart; +import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; + +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +@Slf4j +public class SinkImpl extends BaseResource implements Sink { + + private final WebTarget sink; + + public SinkImpl(WebTarget web, Authentication auth) { + super(auth); + this.sink = web.path("/admin/sink"); + } + + @Override + public List<String> getSinks(String tenant, String namespace) throws PulsarAdminException { + try { + Response response = request(sink.path(tenant).path(namespace)).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(new GenericType<List<String>>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public FunctionDetails getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException { + try { + Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + mergeJson(jsonResponse, functionDetailsBuilder); + return functionDetailsBuilder.build(); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public FunctionStatusList getSinkStatus( + String tenant, String namespace, String sinkName) throws PulsarAdminException { + try { + Response response = request(sink.path(tenant).path(namespace).path(sinkName).path("status")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); + FunctionStatusList.Builder functionStatusBuilder = FunctionStatusList.newBuilder(); + mergeJson(jsonResponse, functionStatusBuilder); + return functionStatusBuilder.build(); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public FunctionStatus getSinkStatus( + String tenant, String namespace, String sinkName, int id) throws PulsarAdminException { + try { + Response response = request( + sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status")) + .get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); + FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder(); + mergeJson(jsonResponse, functionStatusBuilder); + return functionStatusBuilder.build(); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException { + try { + final FormDataMultiPart mp = new FormDataMultiPart(); + + if (fileName != null && !fileName.startsWith("builtin://")) { + // If the function code is built in, we don't need to submit here + mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + } + + mp.bodyPart(new FormDataBodyPart("sinkConfig", + new Gson().toJson(sinkConfig), + MediaType.APPLICATION_JSON_TYPE)); + request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName())) + .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException { + try { + final FormDataMultiPart mp = new FormDataMultiPart(); + + mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); + + mp.bodyPart(new FormDataBodyPart("sinkConfig", + new Gson().toJson(sinkConfig), + MediaType.APPLICATION_JSON_TYPE)); + request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName())) + .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void deleteSink(String cluster, String namespace, String function) throws PulsarAdminException { + try { + request(sink.path(cluster).path(namespace).path(function)) + .delete(ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException { + try { + final FormDataMultiPart mp = new FormDataMultiPart(); + + if (fileName != null && !fileName.startsWith("builtin://")) { + // If the function code is built in, we don't need to submit here + mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + } + + mp.bodyPart(new FormDataBodyPart("sinkConfig", + new Gson().toJson(sinkConfig), + MediaType.APPLICATION_JSON_TYPE)); + request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName())) + .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException { + try { + final FormDataMultiPart mp = new FormDataMultiPart(); + + mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); + + mp.bodyPart(new FormDataBodyPart("sinkConfig", new Gson().toJson(sinkConfig), + MediaType.APPLICATION_JSON_TYPE)); + request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()) + .path(sinkConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), + ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void restartSink(String tenant, String namespace, String functionName, int instanceId) + throws PulsarAdminException { + try { + request(sink.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId)) + .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void restartSink(String tenant, String namespace, String functionName) throws PulsarAdminException { + try { + request(sink.path(tenant).path(namespace).path(functionName).path("restart")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void stopSink(String tenant, String namespace, String sinkName, int instanceId) + throws PulsarAdminException { + try { + request(sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId)) + .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void stopSink(String tenant, String namespace, String sinkName) throws PulsarAdminException { + try { + request(sink.path(tenant).path(namespace).path(sinkName).path("stop")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException { + try { + Response response = request(sink.path("builtinsink")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(new GenericType<List<ConnectorDefinition>>() {}); + } catch (Exception e) { + throw getApiException(e); + } + } + + + public static void mergeJson(String json, Builder builder) throws IOException { + JsonFormat.parser().merge(json, builder); + } + +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java new file mode 100644 index 0000000..3ae0a6f --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin.internal; + +import com.google.gson.Gson; +import com.google.protobuf.AbstractMessage.Builder; +import com.google.protobuf.util.JsonFormat; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Source; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; +import org.apache.pulsar.functions.utils.SourceConfig; +import org.glassfish.jersey.media.multipart.FormDataBodyPart; +import org.glassfish.jersey.media.multipart.FormDataMultiPart; +import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; + +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +@Slf4j +public class SourceImpl extends BaseResource implements Source { + + private final WebTarget source; + + public SourceImpl(WebTarget web, Authentication auth) { + super(auth); + this.source = web.path("/admin/source"); + } + + @Override + public List<String> getSources(String tenant, String namespace) throws PulsarAdminException { + try { + Response response = request(source.path(tenant).path(namespace)).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(new GenericType<List<String>>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public FunctionDetails getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException { + try { + Response response = request(source.path(tenant).path(namespace).path(sourceName)).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + mergeJson(jsonResponse, functionDetailsBuilder); + return functionDetailsBuilder.build(); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public FunctionStatusList getSourceStatus( + String tenant, String namespace, String sourceName) throws PulsarAdminException { + try { + Response response = request(source.path(tenant).path(namespace).path(sourceName).path("status")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); + FunctionStatusList.Builder functionStatusBuilder = FunctionStatusList.newBuilder(); + mergeJson(jsonResponse, functionStatusBuilder); + return functionStatusBuilder.build(); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public FunctionStatus getSourceStatus( + String tenant, String namespace, String sourceName, int id) throws PulsarAdminException { + try { + Response response = request( + source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(id)).path("status")) + .get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + String jsonResponse = response.readEntity(String.class); + FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder(); + mergeJson(jsonResponse, functionStatusBuilder); + return functionStatusBuilder.build(); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException { + try { + final FormDataMultiPart mp = new FormDataMultiPart(); + + if (fileName != null && !fileName.startsWith("builtin://")) { + // If the function code is built in, we don't need to submit here + mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + } + + mp.bodyPart(new FormDataBodyPart("sourceConfig", + new Gson().toJson(sourceConfig), + MediaType.APPLICATION_JSON_TYPE)); + request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName())) + .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException { + try { + final FormDataMultiPart mp = new FormDataMultiPart(); + + mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); + + mp.bodyPart(new FormDataBodyPart("sourceConfig", + new Gson().toJson(sourceConfig), + MediaType.APPLICATION_JSON_TYPE)); + request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName())) + .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void deleteSource(String cluster, String namespace, String function) throws PulsarAdminException { + try { + request(source.path(cluster).path(namespace).path(function)) + .delete(ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException { + try { + final FormDataMultiPart mp = new FormDataMultiPart(); + + if (fileName != null && !fileName.startsWith("builtin://")) { + // If the function code is built in, we don't need to submit here + mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + } + + mp.bodyPart(new FormDataBodyPart("sourceConfig", + new Gson().toJson(sourceConfig), + MediaType.APPLICATION_JSON_TYPE)); + request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName())) + .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException { + try { + final FormDataMultiPart mp = new FormDataMultiPart(); + + mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); + + mp.bodyPart(new FormDataBodyPart("sourceConfig", new Gson().toJson(sourceConfig), + MediaType.APPLICATION_JSON_TYPE)); + request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()) + .path(sourceConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), + ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void restartSource(String tenant, String namespace, String functionName, int instanceId) + throws PulsarAdminException { + try { + request(source.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId)) + .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void restartSource(String tenant, String namespace, String functionName) throws PulsarAdminException { + try { + request(source.path(tenant).path(namespace).path(functionName).path("restart")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void stopSource(String tenant, String namespace, String sourceName, int instanceId) + throws PulsarAdminException { + try { + request(source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(instanceId)) + .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void stopSource(String tenant, String namespace, String sourceName) throws PulsarAdminException { + try { + request(source.path(tenant).path(namespace).path(sourceName).path("stop")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException { + try { + Response response = request(source.path("builtinsource")).get(); + if (!response.getStatusInfo().equals(Response.Status.OK)) { + throw new ClientErrorException(response); + } + return response.readEntity(new GenericType<List<ConnectorDefinition>>() {}); + } catch (Exception e) { + throw getApiException(e); + } + } + + + public static void mergeJson(String json, Builder builder) throws IOException { + JsonFormat.parser().merge(json, builder); + } + +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index bd5af79..e56a343 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -79,7 +80,7 @@ public class CmdSinks extends CmdBase { jcommander.addCommand("update", updateSink); jcommander.addCommand("delete", deleteSink); jcommander.addCommand("localrun", localSinkRunner); - jcommander.addCommand("available-sinks", new ListSinks()); + jcommander.addCommand("available-sinks", new ListBuiltInSinks()); } /** @@ -187,9 +188,9 @@ public class CmdSinks extends CmdBase { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(archive)) { - admin.functions().createFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive()); + admin.sink().createSinkWithUrl(sinkConfig, sinkConfig.getArchive()); } else { - admin.functions().createFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive()); + admin.sink().createSink(sinkConfig, sinkConfig.getArchive()); } print("Created successfully"); } @@ -200,9 +201,9 @@ public class CmdSinks extends CmdBase { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(archive)) { - admin.functions().updateFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive()); + admin.sink().updateSinkWithUrl(sinkConfig, sinkConfig.getArchive()); } else { - admin.functions().updateFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive()); + admin.sink().updateSink(sinkConfig, sinkConfig.getArchive()); } print("Updated successfully"); } @@ -282,6 +283,8 @@ public class CmdSinks extends CmdBase { protected SinkConfig sinkConfig; + protected NarClassLoader classLoader; + private void mergeArgs() { if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = DEPRECATED_subsName; if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern = DEPRECATED_topicsPattern; @@ -449,7 +452,6 @@ public class CmdSinks extends CmdBase { } // if jar file is present locally then load jar and validate SinkClass in it - ClassLoader classLoader = null; if (archivePath != null) { if (!fileExists(archivePath)) { throw new ParameterException("Archive file " + archivePath + " does not exist"); @@ -482,14 +484,14 @@ public class CmdSinks extends CmdBase { throws IOException { org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig)), functionDetailsBuilder); + Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig, classLoader)), functionDetailsBuilder); return functionDetailsBuilder.build(); } protected String validateSinkType(String sinkType) throws IOException { Set<String> availableSinks; try { - availableSinks = admin.functions().getSinks(); + availableSinks = admin.sink().getBuiltInSinks().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet()); } catch (PulsarAdminException e) { throw new IOException(e); } @@ -533,16 +535,16 @@ public class CmdSinks extends CmdBase { @Override void runCmd() throws Exception { - admin.functions().deleteFunction(tenant, namespace, name); + admin.sink().deleteSink(tenant, namespace, name); print("Deleted successfully"); } } @Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster") - public class ListSinks extends BaseCommand { + public class ListBuiltInSinks extends BaseCommand { @Override void runCmd() throws Exception { - admin.functions().getConnectorsList().stream().filter(x -> isNotBlank(x.getSinkClass())) + admin.sink().getBuiltInSinks().stream().filter(x -> isNotBlank(x.getSinkClass())) .forEach(connector -> { System.out.println(connector.getName()); System.out.println(WordUtils.wrap(connector.getDescription(), 80)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 5d0c84c..f2d7681 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -37,6 +37,7 @@ import java.nio.file.Paths; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -79,7 +80,7 @@ public class CmdSources extends CmdBase { jcommander.addCommand("update", updateSource); jcommander.addCommand("delete", deleteSource); jcommander.addCommand("localrun", localSourceRunner); - jcommander.addCommand("available-sources", new ListSources()); + jcommander.addCommand("available-sources", new ListBuiltInSources()); } /** @@ -187,9 +188,9 @@ public class CmdSources extends CmdBase { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) { - admin.functions().createFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive()); + admin.source().createSourceWithUrl(sourceConfig, sourceConfig.getArchive()); } else { - admin.functions().createFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive()); + admin.source().createSource(sourceConfig, sourceConfig.getArchive()); } print("Created successfully"); } @@ -200,9 +201,9 @@ public class CmdSources extends CmdBase { @Override void runCmd() throws Exception { if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) { - admin.functions().updateFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive()); + admin.source().updateSourceWithUrl(sourceConfig, sourceConfig.getArchive()); } else { - admin.functions().updateFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive()); + admin.source().updateSource(sourceConfig, sourceConfig.getArchive()); } print("Updated successfully"); } @@ -267,6 +268,8 @@ public class CmdSources extends CmdBase { protected SourceConfig sourceConfig; + protected NarClassLoader classLoader; + private void mergeArgs() { if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees; if (!StringUtils.isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName; @@ -403,7 +406,6 @@ public class CmdSources extends CmdBase { // if jar file is present locally then load jar and validate SinkClass in it - ClassLoader classLoader = null; if (archivePath != null) { if (!fileExists(archivePath)) { throw new ParameterException("Archive file " + archivePath + " does not exist"); @@ -436,14 +438,14 @@ public class CmdSources extends CmdBase { throws IOException { org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig)), functionDetailsBuilder); + Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig, classLoader)), functionDetailsBuilder); return functionDetailsBuilder.build(); } protected String validateSourceType(String sourceType) throws IOException { Set<String> availableSources; try { - availableSources = admin.functions().getSources(); + availableSources = admin.source().getBuiltInSources().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet()); } catch (PulsarAdminException e) { throw new IOException(e); } @@ -487,16 +489,16 @@ public class CmdSources extends CmdBase { @Override void runCmd() throws Exception { - admin.functions().deleteFunction(tenant, namespace, name); + admin.source().deleteSource(tenant, namespace, name); print("Delete source successfully"); } } @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster") - public class ListSources extends BaseCommand { + public class ListBuiltInSources extends BaseCommand { @Override void runCmd() throws Exception { - admin.functions().getConnectorsList().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass())) + admin.source().getBuiltInSources().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass())) .forEach(connector -> { System.out.println(connector.getName()); System.out.println(WordUtils.wrap(connector.getDescription(), 80));