This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ee81c7e7ad6a7b4f466ba4abf478545e6d3bccf2 Author: Sanjeev Kulkarni <sanj...@streaml.io> AuthorDate: Thu Oct 4 16:56:26 2018 -0700 First cut of adding new endpoints for source/sink --- .../pulsar/broker/admin/impl/FunctionsBase.java | 4 +- .../apache/pulsar/broker/admin/impl/SinkBase.java | 258 ++++++++++++++++++++ .../pulsar/broker/admin/impl/SourceBase.java | 261 +++++++++++++++++++++ .../org/apache/pulsar/broker/admin/v1/Sink.java | 29 +++ .../org/apache/pulsar/broker/admin/v1/Source.java | 29 +++ .../org/apache/pulsar/broker/admin/v2/Sink.java | 34 +++ .../org/apache/pulsar/broker/admin/v2/Source.java | 34 +++ .../runtime/KubernetesRuntimeFactory.java | 3 +- .../src/main/resources/java_instance_log4j2.yml | 4 +- .../pulsar/functions/worker/rest/Resources.java | 4 + .../functions/worker/rest/api/FunctionsImpl.java | 64 +++-- .../worker/rest/api/v2/FunctionApiV2Resource.java | 4 +- .../worker/rest/api/v2/SinkApiV2Resource.java | 180 ++++++++++++++ .../worker/rest/api/v2/SourceApiV2Resource.java | 180 ++++++++++++++ 14 files changed, 1060 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index cff3c34..d20bc62 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -84,7 +84,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi final @FormDataParam("functionConfig") String functionConfigJson) { return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId()); } @PUT @@ -106,7 +106,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi final @FormDataParam("functionConfig") String functionConfigJson) { return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java new file mode 100644 index 0000000..43ca8bf --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.impl; + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.functions.proto.Function.FunctionMetaData; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +public class SinkBase extends AdminResource implements Supplier<WorkerService> { + + private final FunctionsImpl functions; + + public SinkBase() { + this.functions = new FunctionsImpl(this); + } + + @Override + public WorkerService get() { + return pulsar().getWorkerService(); + } + + @POST + @ApiOperation(value = "Creates a new Pulsar Sink in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 200, message = "Pulsar Function successfully created") + }) + @Path("/{tenant}/{namespace}/{sinkName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response registerSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sinkConfig") String sinkConfigJson) { + + return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, + functionPkgUrl, null, null, null, sinkConfigJson, clientAppId()); + } + + @PUT + @ApiOperation(value = "Updates a Pulsar Sink currently running in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"), + @ApiResponse(code = 200, message = "Pulsar Function successfully updated") + }) + @Path("/{tenant}/{namespace}/{sinkName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response updateSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sinkConfig") String sinkConfigJson) { + + return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, + functionPkgUrl, null, null, null, sinkConfigJson, clientAppId()); + + } + + + @DELETE + @ApiOperation(value = "Deletes a Pulsar Sink currently running in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function doesn't exist"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 200, message = "The function was successfully deleted") + }) + @Path("/{tenant}/{namespace}/{sinkName}") + public Response deregisterSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) { + return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId()); + } + + @GET + @ApiOperation( + value = "Fetches information about a Pulsar Sink currently running in cluster mode", + response = FunctionMetaData.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 404, message = "The function doesn't exist") + }) + @Path("/{tenant}/{namespace}/{sinkName}") + public Response getSinkInfo(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) throws IOException { + return functions.getFunctionInfo(tenant, namespace, sinkName); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Sink instance", + response = FunctionStatus.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 404, message = "The function doesn't exist") + }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status") + public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) throws IOException { + return functions.getFunctionInstanceStatus( + tenant, namespace, sinkName, instanceId, uri.getRequestUri()); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Sink running in cluster mode", + response = FunctionStatus.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}/{sinkName}/status") + public Response getSinkStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) throws IOException { + return functions.getFunctionStatus(tenant, namespace, sinkName); + } + + @GET + @ApiOperation( + value = "Lists all Pulsar Sinks currently deployed in a given namespace", + response = String.class, + responseContainer = "Collection" + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}") + public Response listSinks(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace) { + return functions.listFunctions(tenant, namespace); + + } + + @POST + @ApiOperation(value = "Restart sink instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Restart all sink instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { + return functions.restartFunctionInstances(tenant, namespace, sinkName); + } + + @POST + @ApiOperation(value = "Stop sink instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { + return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Stop all sink instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { + return functions.stopFunctionInstances(tenant, namespace, sinkName); + } + + @GET + @ApiOperation( + value = "Fetches a list of supported Pulsar IO sink connectors currently running in cluster mode", + response = List.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 408, message = "Request timeout") + }) + @Path("/builtinsinks") + public List<ConnectorDefinition> getSinkList() { + List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors(); + List<ConnectorDefinition> retval = new ArrayList<>(); + for (ConnectorDefinition connectorDefinition : connectorDefinitions) { + if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) { + retval.add(connectorDefinition); + } + } + return retval; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java new file mode 100644 index 0000000..b4428ab --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.impl; + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.functions.proto.Function.FunctionMetaData; +import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +public class SourceBase extends AdminResource implements Supplier<WorkerService> { + + private final FunctionsImpl functions; + + public SourceBase() { + this.functions = new FunctionsImpl(this); + } + + @Override + public WorkerService get() { + return pulsar().getWorkerService(); + } + + @POST + @ApiOperation(value = "Creates a new Pulsar Source in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 200, message = "Pulsar Function successfully created") + }) + @Path("/{tenant}/{namespace}/{sourceName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response registerSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sourceConfig") String sourceConfigJson) { + + return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, + functionPkgUrl, null, null, sourceConfigJson, null, clientAppId()); + } + + @PUT + @ApiOperation(value = "Updates a Pulsar Source currently running in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"), + @ApiResponse(code = 200, message = "Pulsar Function successfully updated") + }) + @Path("/{tenant}/{namespace}/{sourceName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response updateSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sourceConfig") String sourceConfigJson) { + + return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, + functionPkgUrl, null, null, sourceConfigJson, null, clientAppId()); + + } + + + @DELETE + @ApiOperation(value = "Deletes a Pulsar Source currently running in cluster mode") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function doesn't exist"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 200, message = "The function was successfully deleted") + }) + @Path("/{tenant}/{namespace}/{sourceName}") + public Response deregisterSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) { + return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId()); + } + + @GET + @ApiOperation( + value = "Fetches information about a Pulsar Source currently running in cluster mode", + response = FunctionMetaData.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 408, message = "Request timeout"), + @ApiResponse(code = 404, message = "The function doesn't exist") + }) + @Path("/{tenant}/{namespace}/{sourceName}") + public Response getSourceInfo(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) throws IOException { + return functions.getFunctionInfo( + tenant, namespace, sourceName); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Source instance", + response = FunctionStatus.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 404, message = "The function doesn't exist") + }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status") + public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) throws IOException { + return functions.getFunctionInstanceStatus( + tenant, namespace, sourceName, instanceId, uri.getRequestUri()); + } + + @GET + @ApiOperation( + value = "Displays the status of a Pulsar Source running in cluster mode", + response = FunctionStatus.class + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}/{sourceName}/status") + public Response getSourceStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) throws IOException { + return functions.getFunctionStatus(tenant, namespace, sourceName); + } + + @GET + @ApiOperation( + value = "Lists all Pulsar Sources currently deployed in a given namespace", + response = String.class, + responseContainer = "Collection" + ) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions") + }) + @Path("/{tenant}/{namespace}") + public Response listSources(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace) { + return functions.listFunctions( + tenant, namespace); + + } + + @POST + @ApiOperation(value = "Restart source instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Restart all source instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { + return functions.restartFunctionInstances(tenant, namespace, sourceName); + } + + @POST + @ApiOperation(value = "Stop source instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) { + return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Stop all source instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { + return functions.stopFunctionInstances(tenant, namespace, sourceName); + } + + @GET + @ApiOperation( + value = "Fetches a list of supported Pulsar IO source connectors currently running in cluster mode", + response = List.class + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), + @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 408, message = "Request timeout") + }) + @Path("/builtinsources") + public List<ConnectorDefinition> getSourceList() { + List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors(); + List<ConnectorDefinition> retval = new ArrayList<>(); + for (ConnectorDefinition connectorDefinition : connectorDefinitions) { + if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) { + retval.add(connectorDefinition); + } + } + return retval; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Sink.java new file mode 100644 index 0000000..3d73d4d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Sink.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v1; + +import io.swagger.annotations.Api; +import org.apache.pulsar.broker.admin.impl.SinkBase; + +import javax.ws.rs.Path; + +@Path("/sink") +@Api(value = "/sink", description = "Sink admin apis", tags = "sink", hidden = true) +public class Sink extends SinkBase { +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Source.java new file mode 100644 index 0000000..99b41dc --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Source.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v1; + +import io.swagger.annotations.Api; +import org.apache.pulsar.broker.admin.impl.SourceBase; + +import javax.ws.rs.Path; + +@Path("/source") +@Api(value = "/source", description = "Source admin apis", tags = "source", hidden = true) +public class Source extends SourceBase { +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java new file mode 100644 index 0000000..aea0ae7 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v2; + +import io.swagger.annotations.Api; +import org.apache.pulsar.broker.admin.impl.SinkBase; + +import javax.ws.rs.Consumes; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/sink") +@Api(value = "/sink", description = "Sink admin apis", tags = "sink", hidden = true) +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class Sink extends SinkBase { +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java new file mode 100644 index 0000000..e5ef56c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v2; + +import io.swagger.annotations.Api; +import org.apache.pulsar.broker.admin.impl.SourceBase; + +import javax.ws.rs.Consumes; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/source") +@Api(value = "/source", description = "Source admin apis", tags = "source", hidden = true) +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class Source extends SourceBase { +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index 8ba7505..cba9ebf 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -141,7 +141,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { KubernetesRuntime.doChecks(functionDetails); } - private void setupClient() throws Exception { + @VisibleForTesting + void setupClient() throws Exception { if (appsClient == null) { if (k8Uri == null) { log.info("k8Uri is null thus going by defaults"); diff --git a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml index df25367..9846f05 100644 --- a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml +++ b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml @@ -44,7 +44,7 @@ Configuration: name: RollingFile fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log" filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz" - immediateFlush: false + immediateFlush: true PatternLayout: Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" Policies: @@ -72,7 +72,7 @@ Configuration: name: BkRollingFile fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk" filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz" - immediateFlush: false + immediateFlush: true PatternLayout: Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" Policies: diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java index 26c7127..ac011db 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java @@ -20,6 +20,8 @@ package org.apache.pulsar.functions.worker.rest; import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource; import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource; +import org.apache.pulsar.functions.worker.rest.api.v2.SinkApiV2Resource; +import org.apache.pulsar.functions.worker.rest.api.v2.SourceApiV2Resource; import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource; import org.glassfish.jersey.media.multipart.MultiPartFeature; @@ -36,6 +38,8 @@ public final class Resources { return new HashSet<>( Arrays.asList( FunctionApiV2Resource.class, + SourceApiV2Resource.class, + SinkApiV2Resource.class, WorkerApiV2Resource.class, MultiPartFeature.class )); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 3124731..0d30e37 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -27,7 +27,6 @@ import static org.apache.pulsar.functions.utils.Reflections.loadJar; import static org.apache.pulsar.functions.utils.Utils.FILE; import static org.apache.pulsar.functions.utils.Utils.HTTP; import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; -import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create; import com.google.gson.Gson; @@ -39,10 +38,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.CopyOption; import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.util.Base64; import java.util.Collection; import java.util.LinkedList; @@ -89,9 +85,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; -import org.apache.pulsar.functions.utils.FunctionConfig; -import org.apache.pulsar.functions.utils.FunctionConfigUtils; -import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders; +import org.apache.pulsar.functions.utils.*; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; @@ -136,6 +130,7 @@ public class FunctionsImpl { public Response registerFunction(final String tenant, final String namespace, final String functionName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson, + final String sourceConfigJson, final String sinkConfigJson, final String clientRole) { if (!isWorkerServiceAvailable()) { @@ -173,10 +168,10 @@ public class FunctionsImpl { try { if (isPkgUrlProvided) { functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl, - functionDetailsJson, functionConfigJson); + functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson); } else { functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile, - fileDetail, functionDetailsJson, functionConfigJson); + fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson); } } catch (Exception e) { log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e); @@ -216,7 +211,7 @@ public class FunctionsImpl { public Response updateFunction(final String tenant, final String namespace, final String functionName, final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson, - final String clientRole) { + final String sourceConfigJson, final String sinkConfigJson, final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -252,10 +247,10 @@ public class FunctionsImpl { try { if (isPkgUrlProvided) { functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl, - functionDetailsJson, functionConfigJson); + functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson); } else { functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile, - fileDetail, functionDetailsJson, functionConfigJson); + fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson); } } catch (Exception e) { log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e); @@ -877,23 +872,24 @@ public class FunctionsImpl { } private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName, - String functionPkgUrl, String functionDetailsJson, String functionConfigJson) + String functionPkgUrl, String functionDetailsJson, String functionConfigJson, + String sourceConfigJson, String sinkConfigJson) throws IllegalArgumentException, IOException, URISyntaxException { if (!isFunctionPackageUrlSupported(functionPkgUrl)) { throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)"); } FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - functionDetailsJson, functionConfigJson, functionPkgUrl, null); + functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, functionPkgUrl, null); return functionDetails; } private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson, - String functionConfigJson) + String functionConfigJson, String sourceConfigJson, String sinkConfigJson) throws IllegalArgumentException, IOException, URISyntaxException { FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - functionDetailsJson, functionConfigJson, null, uploadedInputStreamAsFile); + functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, null, uploadedInputStreamAsFile); if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) { throw new IllegalArgumentException("Function Package is not provided"); } @@ -966,7 +962,8 @@ public class FunctionsImpl { } private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, - String functionDetailsJson, String functionConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException { + String functionDetailsJson, String functionConfigJson, String sourceConfigJson, + String sinkConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException { if (tenant == null) { throw new IllegalArgumentException("Tenant is not provided"); } @@ -977,11 +974,24 @@ public class FunctionsImpl { throw new IllegalArgumentException("Function Name is not provided"); } - if (StringUtils.isEmpty(functionDetailsJson) && StringUtils.isEmpty(functionConfigJson)) { - throw new IllegalArgumentException("FunctionConfig is not provided"); + int numDefinitions = 0; + if (StringUtils.isEmpty(functionDetailsJson)) { + numDefinitions++; + } + if (StringUtils.isEmpty(functionConfigJson)) { + numDefinitions++; + } + if (StringUtils.isEmpty(sourceConfigJson)) { + numDefinitions++; } - if (!StringUtils.isEmpty(functionDetailsJson) && !StringUtils.isEmpty(functionConfigJson)) { - throw new IllegalArgumentException("Only one of FunctionDetails or FunctionConfig should be provided"); + if (StringUtils.isEmpty(sinkConfigJson)) { + numDefinitions++; + } + if (numDefinitions == 0) { + throw new IllegalArgumentException("Function Info is not provided"); + } + if (numDefinitions > 1) { + throw new IllegalArgumentException("Conflicting Info provided"); } if (!StringUtils.isEmpty(functionConfigJson)) { FunctionConfig functionConfig = new Gson().fromJson(functionConfigJson, FunctionConfig.class); @@ -995,6 +1005,18 @@ public class FunctionsImpl { ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader); return FunctionConfigUtils.convert(functionConfig, clsLoader); } + if (!StringUtils.isEmpty(sourceConfigJson)) { + SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class); + ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile); + ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader); + return SourceConfigUtils.convert(sourceConfig, clsLoader); + } + if (!StringUtils.isEmpty(sinkConfigJson)) { + SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class); + ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile); + ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader); + return SinkConfigUtils.convert(sinkConfig, clsLoader); + } FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder); if (isNotBlank(functionPkgUrl)) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 1e44a60..a842b9e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -60,7 +60,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { final @FormDataParam("functionConfig") String functionConfigJson) { return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId()); } @@ -77,7 +77,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { final @FormDataParam("functionConfig") String functionConfigJson) { return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail, - functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId()); + functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId()); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java new file mode 100644 index 0000000..175e2eb --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api.v2; + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.functions.worker.rest.FunctionApiResource; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Path("/source") +public class SinkApiV2Resource extends FunctionApiResource { + + @POST + @Path("/{tenant}/{namespace}/{sinkName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response registerSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String sourcePkgUrl, + final @FormDataParam("sinkConfig") String sinkConfigJson) { + + return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, + sourcePkgUrl, null, null, null, sinkConfigJson, clientAppId()); + + } + + @PUT + @Path("/{tenant}/{namespace}/{sinkName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response updateSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sinkConfig") String sinkConfigJson) { + + return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, + functionPkgUrl, null, null, null, sinkConfigJson, clientAppId()); + + } + + + @DELETE + @Path("/{tenant}/{namespace}/{sinkName}") + public Response deregisterSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { + return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId()); + } + + @GET + @Path("/{tenant}/{namespace}/{sinkName}") + public Response getSinkInfo(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) + throws IOException { + return functions.getFunctionInfo(tenant, namespace, sinkName); + } + + @GET + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status") + public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) throws IOException { + return functions.getFunctionInstanceStatus( + tenant, namespace, sinkName, instanceId, uri.getRequestUri()); + } + + @GET + @Path("/{tenant}/{namespace}/{sinkName}/status") + public Response getSinkStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sinkName") String sinkName) throws IOException { + return functions.getFunctionStatus(tenant, namespace, sinkName); + } + + @GET + @Path("/{tenant}/{namespace}") + public Response listSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace) { + return functions.listFunctions(tenant, namespace); + + } + + @POST + @ApiOperation(value = "Restart sink instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Restart all sink instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { + return functions.restartFunctionInstances(tenant, namespace, sinkName); + } + + @POST + @ApiOperation(value = "Stop sink instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName, + final @PathParam("instanceId") String instanceId) { + return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Stop all sink instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sinkName}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopSink(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) { + return functions.stopFunctionInstances(tenant, namespace, sinkName); + } + + @GET + @Path("/builtinsinks") + public List<ConnectorDefinition> getSinkList() { + List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors(); + List<ConnectorDefinition> retval = new ArrayList<>(); + for (ConnectorDefinition connectorDefinition : connectorDefinitions) { + if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) { + retval.add(connectorDefinition); + } + } + return retval; + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java new file mode 100644 index 0000000..9d25bf9 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api.v2; + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.functions.worker.rest.FunctionApiResource; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Path("/source") +public class SourceApiV2Resource extends FunctionApiResource { + + @POST + @Path("/{tenant}/{namespace}/{sourceName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response registerSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String sourcePkgUrl, + final @FormDataParam("sourceConfig") String sourceConfigJson) { + + return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, + sourcePkgUrl, null, null, sourceConfigJson, null, clientAppId()); + + } + + @PUT + @Path("/{tenant}/{namespace}/{sourceName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response updateSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sourceConfig") String sourceConfigJson) { + + return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, + functionPkgUrl, null, null, sourceConfigJson, null, clientAppId()); + + } + + + @DELETE + @Path("/{tenant}/{namespace}/{sourceName}") + public Response deregisterSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { + return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId()); + } + + @GET + @Path("/{tenant}/{namespace}/{sourceName}") + public Response getSourceInfo(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) + throws IOException { + return functions.getFunctionInfo(tenant, namespace, sourceName); + } + + @GET + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status") + public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) throws IOException { + return functions.getFunctionInstanceStatus( + tenant, namespace, sourceName, instanceId, uri.getRequestUri()); + } + + @GET + @Path("/{tenant}/{namespace}/{sourceName}/status") + public Response getSourceStatus(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName) throws IOException { + return functions.getFunctionStatus(tenant, namespace, sourceName); + } + + @GET + @Path("/{tenant}/{namespace}") + public Response listSources(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace) { + return functions.listFunctions(tenant, namespace); + + } + + @POST + @ApiOperation(value = "Restart source instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Restart all source instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { + return functions.restartFunctionInstances(tenant, namespace, sourceName); + } + + @POST + @ApiOperation(value = "Stop source instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName, + final @PathParam("instanceId") String instanceId) { + return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri()); + } + + @POST + @ApiOperation(value = "Stop all source instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{sourceName}/stop") + @Consumes(MediaType.APPLICATION_JSON) + public Response stopSource(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) { + return functions.stopFunctionInstances(tenant, namespace, sourceName); + } + + @GET + @Path("/builtinsources") + public List<ConnectorDefinition> getSourceList() { + List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors(); + List<ConnectorDefinition> retval = new ArrayList<>(); + for (ConnectorDefinition connectorDefinition : connectorDefinitions) { + if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) { + retval.add(connectorDefinition); + } + } + return retval; + } +}