This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6df595a Fix Spellings and Code Cleanup (#3181) 6df595a is described below commit 6df595a341a884404242b8d6dcf0d686e71907ee Author: Ali Ahmed <alahmed...@gmail.com> AuthorDate: Thu Dec 13 06:15:34 2018 -0800 Fix Spellings and Code Cleanup (#3181) * Fix Spellings and Code Cleanup * Fix Code comments * Remove Tenant check * Fix test cases --- .../pulsar/broker/admin/impl/BrokerStatsBase.java | 2 +- .../pulsar/broker/admin/impl/BrokersBase.java | 2 +- .../pulsar/broker/admin/impl/ClustersBase.java | 4 +- .../broker/admin/impl/ResourceQuotasBase.java | 4 +- .../apache/pulsar/broker/admin/v2/BrokerStats.java | 2 +- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 2 +- .../org/apache/pulsar/admin/cli/CmdSources.java | 2 +- .../apache/pulsar/admin/cli/TestCmdSources.java | 4 +- .../functions/worker/rest/api/ComponentImpl.java | 246 ++++++++++++++------- .../functions/worker/rest/api/FunctionsImpl.java | 22 +- .../worker/rest/api/FunctionsMetricsResource.java | 3 +- .../pulsar/functions/worker/rest/api/SinkImpl.java | 33 ++- .../functions/worker/rest/api/SourceImpl.java | 28 ++- .../functions/worker/rest/api/WorkerImpl.java | 25 ++- 14 files changed, 246 insertions(+), 133 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java index 03fc9a2..43b9ba4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java @@ -87,7 +87,7 @@ public class BrokerStatsBase extends AdminResource { @GET @Path("/destinations") - @ApiOperation(value = "Get all the topic stats by namesapce", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558 + @ApiOperation(value = "Get all the topic stats by namespace", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558 // map // support // missing diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index c4d504c..900ad7c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -117,7 +117,7 @@ public class BrokersBase extends AdminResource { @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"), @ApiResponse(code = 404, message = "Configuration not found"), @ApiResponse(code = 412, message = "Configuration can't be updated dynamically") }) - public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{ + public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception { validateSuperUserAccess(); updateDynamicConfigurationOnZk(configName, configValue); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 3fcb7d6..b65d268 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -344,7 +344,7 @@ public class ClustersBase extends AdminResource { .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + cluster + " does not exist")); - // construct the response to NamespaceisolationData map + // construct the response to Namespace isolation data map return nsIsolationPolicies.getPolicies(); } catch (Exception e) { log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e); @@ -368,7 +368,7 @@ public class ClustersBase extends AdminResource { .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + cluster + " does not exist")); - // construct the response to NamespaceisolationData map + // construct the response to Namespace isolation data map if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) { log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", policyName, cluster); throw new RestException(Status.NOT_FOUND, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java index 2c664f0..3f502da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java @@ -94,7 +94,7 @@ public abstract class ResourceQuotasBase extends NamespacesBase { } catch (KeeperException.NoNodeException e) { log.warn("[{}] Failed to set resource quota for namespace bundle {}: concurrent modification", clientAppId(), nsBundle.toString()); - throw new RestException(Status.CONFLICT, "Cuncurrent modification on namespace bundle quota"); + throw new RestException(Status.CONFLICT, "Concurrent modification on namespace bundle quota"); } catch (Exception e) { log.error("[{}] Failed to set resource quota for namespace bundle {}", clientAppId(), nsBundle.toString()); throw new RestException(e); @@ -123,7 +123,7 @@ public abstract class ResourceQuotasBase extends NamespacesBase { } catch (KeeperException.NoNodeException e) { log.warn("[{}] Failed to unset resource quota for namespace bundle {}: concurrent modification", clientAppId(), nsBundle.toString()); - throw new RestException(Status.CONFLICT, "Cuncurrent modification on namespace bundle quota"); + throw new RestException(Status.CONFLICT, "Concurrent modification on namespace bundle quota"); } catch (Exception e) { log.error("[{}] Failed to unset resource quota for namespace bundle {}", clientAppId(), nsBundle.toString()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java index 5fd5783..a04fded 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/BrokerStats.java @@ -43,7 +43,7 @@ public class BrokerStats extends BrokerStatsBase { @GET @Path("/topics") @ApiOperation( - value = "Get all the topic stats by namesapce", + value = "Get all the topic stats by namespace", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558 diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 47ce6e4..5d1b9c0 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -86,7 +86,7 @@ public class CmdSinks extends CmdBase { jcommander.addCommand("delete", deleteSink); jcommander.addCommand("list", listSinks); jcommander.addCommand("get", getSink); - // TODO depecreate getstatus + // TODO deprecate getstatus jcommander.addCommand("status", getSinkStatus, "getstatus"); jcommander.addCommand("stop", stopSink); jcommander.addCommand("restart", restartSink); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 9ec950a..7acd85b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -382,7 +382,7 @@ public class CmdSources extends CmdBase { protected void validateSourceConfigs(SourceConfig sourceConfig) { if (isBlank(sourceConfig.getArchive())) { - throw new ParameterException("Source archive not specfied"); + throw new ParameterException("Source archive not specified"); } org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig); if (!Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()) && diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index 2eb951a..bf9c079 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -178,7 +178,7 @@ public class TestCmdSources { ); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied") + @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specified") public void testMissingArchive() throws Exception { SourceConfig sourceConfig = getSourceConfig(); sourceConfig.setArchive(null); @@ -356,7 +356,7 @@ public class TestCmdSources { testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig); } - @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specfied") + @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Source archive not specified") public void testCmdSourceConfigFileMissingJar() throws Exception { SourceConfig testSourceConfig = getSourceConfig(); testSourceConfig.setArchive(null); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index dd2dce2..fa8f783 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -153,12 +153,16 @@ public abstract class ComponentImpl { public abstract T notScheduledInstance(); - public abstract T fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String assignedWorkerId); + public abstract T fromFunctionStatusProto(final InstanceCommunication.FunctionStatus status, + final String assignedWorkerId); - public abstract T notRunning(String assignedWorkerId, String error); + public abstract T notRunning(final String assignedWorkerId, final String error); - public T getComponentInstanceStatus(String tenant, String namespace, - String name, int instanceId, URI uri) { + public T getComponentInstanceStatus(final String tenant, + final String namespace, + final String name, + final int instanceId, + final URI uri) { Function.Assignment assignment; if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) { @@ -218,16 +222,23 @@ public abstract class ComponentImpl { } } - public abstract S getStatus(String tenant, String namespace, - String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException; + public abstract S getStatus(final String tenant, + final String namespace, + final String name, + final Collection<Function.Assignment> assignments, + final URI uri) throws PulsarAdminException; - public abstract S getStatusExternal(String tenant, String namespace, - String name, int parallelism); + public abstract S getStatusExternal(final String tenant, + final String namespace, + final String name, + final int parallelism); - public abstract S emptyStatus(int parallelism); + public abstract S emptyStatus(final int parallelism); - public S getComponentStatus(String tenant, String namespace, - String name, URI uri) { + public S getComponentStatus(final String tenant, + final String namespace, + final String name, + final URI uri) { Function.FunctionMetaData functionMetaData = worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, name); @@ -291,10 +302,14 @@ public abstract class ComponentImpl { return true; } - public Response registerFunction(final String tenant, final String namespace, final String componentName, - final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson, - + public Response registerFunction(final String tenant, + final String namespace, + final String componentName, + final InputStream uploadedInputStream, + final FormDataContentDisposition fileDetail, + final String functionPkgUrl, + final String functionDetailsJson, + final String componentConfigJson, final String clientRole) { if (!isWorkerServiceAvailable()) { @@ -315,9 +330,11 @@ public abstract class ComponentImpl { } try { - TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant); - String qualifedNamespace = tenant + "/" + namespace; - if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifedNamespace)) { + // Check tenant exists + final TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant); + + String qualifiedNamespace = tenant + "/" + namespace; + if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifiedNamespace)) { log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, componentName, namespace); return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) @@ -409,10 +426,10 @@ public abstract class ComponentImpl { return updateRequest(functionMetaDataBuilder.build()); } - private PackageLocationMetaData.Builder getFunctionPackageLocation(FunctionDetails functionDetails, - String functionPkgUrl, + private PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionDetails functionDetails, + final String functionPkgUrl, final FormDataContentDisposition fileDetail, - File uploadedInputStreamAsFile) throws Exception { + final File uploadedInputStreamAsFile) throws Exception { String tenant = functionDetails.getTenant(); String namespace = functionDetails.getNamespace(); String componentName = functionDetails.getName(); @@ -466,9 +483,14 @@ public abstract class ComponentImpl { } - public Response updateFunction(final String tenant, final String namespace, final String componentName, - final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail, - final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson, + public Response updateFunction(final String tenant, + final String namespace, + final String componentName, + final InputStream uploadedInputStream, + final FormDataContentDisposition fileDetail, + final String functionPkgUrl, + final String functionDetailsJson, + final String componentConfigJson, final String clientRole) { if (!isWorkerServiceAvailable()) { @@ -516,7 +538,7 @@ public abstract class ComponentImpl { FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails()); existingComponentConfigJson = new Gson().toJson(existingFunctionConfig); FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class); - // The rest end points take precendence over whatever is there in functionconfig + // The rest end points take precedence over whatever is there in functionconfig functionConfig.setTenant(tenant); functionConfig.setNamespace(namespace); functionConfig.setName(componentName); @@ -531,7 +553,7 @@ public abstract class ComponentImpl { SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails()); existingComponentConfigJson = new Gson().toJson(existingSourceConfig); SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class); - // The rest end points take precendence over whatever is there in functionconfig + // The rest end points take precedence over whatever is there in functionconfig sourceConfig.setTenant(tenant); sourceConfig.setNamespace(namespace); sourceConfig.setName(componentName); @@ -546,7 +568,7 @@ public abstract class ComponentImpl { SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails()); existingComponentConfigJson = new Gson().toJson(existingSinkConfig); SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class); - // The rest end points take precendence over whatever is there in functionconfig + // The rest end points take precedence over whatever is there in functionconfig sinkConfig.setTenant(tenant); sinkConfig.setNamespace(namespace); sinkConfig.setName(componentName); @@ -570,7 +592,6 @@ public abstract class ComponentImpl { if (uploadedInputStream != null) { uploadedInputStreamAsFile = dumpToTmpFile(uploadedInputStream); } - File existingPackageAsFile = null; // validate parameters try { @@ -581,7 +602,8 @@ public abstract class ComponentImpl { functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile, fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType); } else { - functionDetails = validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType); + functionDetails = validateUpdateRequestParamsWithExistingMetadata( + tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType); } } catch (Exception e) { log.error("Invalid update {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e); @@ -618,8 +640,10 @@ public abstract class ComponentImpl { return updateRequest(functionMetaDataBuilder.build()); } - public Response deregisterFunction(final String tenant, final String namespace, final String componentName, - String clientRole) { + public Response deregisterFunction(final String tenant, + final String namespace, + final String componentName, + final String clientRole) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -702,8 +726,9 @@ public abstract class ComponentImpl { return Response.status(Status.OK).entity(requestResult.toJson()).build(); } - public Response getFunctionInfo(final String tenant, final String namespace, final String componentName) - throws IOException { + public Response getFunctionInfo(final String tenant, + final String namespace, + final String componentName) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -731,32 +756,42 @@ public abstract class ComponentImpl { .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build(); } - String retval; + String retVal; if (componentType.equals(FUNCTION)) { FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails()); - retval = new Gson().toJson(config); + retVal = new Gson().toJson(config); } else if (componentType.equals(SOURCE)) { SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails()); - retval = new Gson().toJson(config); + retVal = new Gson().toJson(config); } else { SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails()); - retval = new Gson().toJson(config); + retVal = new Gson().toJson(config); } - return Response.status(Status.OK).entity(retval).build(); + return Response.status(Status.OK).entity(retVal).build(); } - public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName, - final String instanceId, URI uri) { + public Response stopFunctionInstance(final String tenant, + final String namespace, + final String componentName, + final String instanceId, + final URI uri) { return stopFunctionInstance(tenant, namespace, componentName, instanceId, false, uri); } - public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName, - final String instanceId, URI uri) { + public Response restartFunctionInstance(final String tenant, + final String namespace, + final String componentName, + final String instanceId, + final URI uri) { return stopFunctionInstance(tenant, namespace, componentName, instanceId, true, uri); } - public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName, - final String instanceId, boolean restart, URI uri) { + public Response stopFunctionInstance(final String tenant, + final String namespace, + final String componentName, + final String instanceId, + final boolean restart, + final URI uri) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -797,16 +832,22 @@ public abstract class ComponentImpl { } } - public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName) { + public Response stopFunctionInstances(final String tenant, + final String namespace, + final String componentName) { return stopFunctionInstances(tenant, namespace, componentName, false); } - public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName) { + public Response restartFunctionInstances(final String tenant, + final String namespace, + final String componentName) { return stopFunctionInstances(tenant, namespace, componentName, true); } - public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName, - boolean restart) { + public Response stopFunctionInstances(final String tenant, + final String namespace, + final String componentName, + final boolean restart) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -846,8 +887,10 @@ public abstract class ComponentImpl { } } - public FunctionStats getFunctionStats(final String tenant, final String namespace, final String componentName, - URI uri) throws IOException { + public FunctionStats getFunctionStats(final String tenant, + final String namespace, + final String componentName, + final URI uri) { if (!isWorkerServiceAvailable()) { throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while."); } @@ -887,8 +930,11 @@ public abstract class ComponentImpl { } - public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant, final String namespace, final String componentName, - String instanceId, URI uri) { + public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant, + final String namespace, + final String componentName, + final String instanceId, + final URI uri) { if (!isWorkerServiceAvailable()) { throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while."); } @@ -997,14 +1043,17 @@ public abstract class ComponentImpl { return this.worker().getConnectorsManager().getConnectors(); } - public Response triggerFunction(final String tenant, final String namespace, final String functionName, - final String input, final InputStream uploadedInputStream, final String topic) { + public Response triggerFunction(final String tenant, + final String namespace, + final String functionName, + final String input, + final InputStream uploadedInputStream, + final String topic) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } - FunctionDetails functionDetails; // validate parameters try { validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream); @@ -1098,8 +1147,10 @@ public abstract class ComponentImpl { } } - public Response getFunctionState(final String tenant, final String namespace, - final String functionName, final String key) { + public Response getFunctionState(final String tenant, + final String namespace, + final String functionName, + final String key) { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } @@ -1209,7 +1260,7 @@ public abstract class ComponentImpl { }).build(); } - private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException { + private void validateListFunctionRequestParams(final String tenant, final String namespace) throws IllegalArgumentException { if (tenant == null) { throw new IllegalArgumentException("Tenant is not provided"); @@ -1219,8 +1270,11 @@ public abstract class ComponentImpl { } } - protected void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName, - ComponentType componentType, String instanceId) throws IllegalArgumentException { + protected void validateGetFunctionInstanceRequestParams(final String tenant, + final String namespace, + final String componentName, + final ComponentType componentType, + final String instanceId) throws IllegalArgumentException { validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); if (instanceId == null) { throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType)); @@ -1255,10 +1309,14 @@ public abstract class ComponentImpl { } } - private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String componentName, - String functionPkgUrl, String functionDetailsJson, String componentConfigJson, - ComponentType componentType) - throws IllegalArgumentException, IOException, URISyntaxException { + private FunctionDetails validateUpdateRequestParamsWithPkgUrl(final String tenant, + final String namespace, + final String componentName, + final String functionPkgUrl, + final String functionDetailsJson, + final String componentConfigJson, + final ComponentType componentType) + throws IllegalArgumentException, IOException { if (!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionPkgUrl)) { throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)"); } @@ -1267,9 +1325,14 @@ public abstract class ComponentImpl { return functionDetails; } - private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName, - File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson, - String componentConfigJson, ComponentType componentType) + private FunctionDetails validateUpdateRequestParams(final String tenant, + final String namespace, + final String componentName, + final File uploadedInputStreamAsFile, + final FormDataContentDisposition fileDetail, + final String functionDetailsJson, + final String componentConfigJson, + final ComponentType componentType) throws IllegalArgumentException, IOException { FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, @@ -1281,9 +1344,12 @@ public abstract class ComponentImpl { return functionDetails; } - private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(String tenant, String namespace, String componentName, - PackageLocationMetaData packageLocationMetaData, - String componentConfigJson, ComponentType componentType) throws Exception { + private FunctionDetails validateUpdateRequestParamsWithExistingMetadata(final String tenant, + final String namespace, + final String componentName, + final PackageLocationMetaData packageLocationMetaData, + final String componentConfigJson, + final ComponentType componentType) throws Exception { File tmpFile = File.createTempFile("functions", null); tmpFile.deleteOnExit(); Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath()); @@ -1291,7 +1357,7 @@ public abstract class ComponentImpl { null, componentConfigJson, componentType, null, tmpFile); } - private static File dumpToTmpFile(InputStream uploadedInputStream) { + private static File dumpToTmpFile(final InputStream uploadedInputStream) { try { File tmpFile = File.createTempFile("functions", null); tmpFile.deleteOnExit(); @@ -1302,7 +1368,10 @@ public abstract class ComponentImpl { } } - private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key) + private void validateGetFunctionStateParams(final String tenant, + final String namespace, + final String functionName, + final String key) throws IllegalArgumentException { if (tenant == null) { @@ -1355,9 +1424,14 @@ public abstract class ComponentImpl { return null; } - private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName, - String functionDetailsJson, String componentConfigJson, ComponentType componentType, - String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException { + private FunctionDetails validateUpdateRequestParams(final String tenant, + final String namespace, + final String componentName, + final String functionDetailsJson, + final String componentConfigJson, + final ComponentType componentType, + final String functionPkgUrl, + final File uploadedInputStreamAsFile) throws IOException { if (tenant == null) { throw new IllegalArgumentException("Tenant is not provided"); } @@ -1370,7 +1444,7 @@ public abstract class ComponentImpl { if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) { FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class); - // The rest end points take precendence over whatever is there in functionconfig + // The rest end points take precedence over whatever is there in functionconfig functionConfig.setTenant(tenant); functionConfig.setNamespace(namespace); functionConfig.setName(componentName); @@ -1381,7 +1455,7 @@ public abstract class ComponentImpl { if (componentType.equals(SOURCE)) { Path archivePath = null; SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class); - // The rest end points take precendence over whatever is there in sourceconfig + // The rest end points take precedence over whatever is there in sourceconfig sourceConfig.setTenant(tenant); sourceConfig.setNamespace(namespace); sourceConfig.setName(componentName); @@ -1403,7 +1477,7 @@ public abstract class ComponentImpl { if (componentType.equals(SINK)) { Path archivePath = null; SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class); - // The rest end points take precendence over whatever is there in sinkConfig + // The rest end points take precedence over whatever is there in sinkConfig sinkConfig.setTenant(tenant); sinkConfig.setNamespace(namespace); sinkConfig.setName(componentName); @@ -1562,8 +1636,14 @@ public abstract class ComponentImpl { return TypeResolver.resolveRawArgument(funClass, loadedClass); } - private void validateTriggerRequestParams(String tenant, String namespace, String functionName, String topic, - String input, InputStream uploadedInputStream) { + private void validateTriggerRequestParams(final String tenant, + final String namespace, + final String functionName, + final String topic, + final String input, + final InputStream uploadedInputStream) { + // Note : Checking topic is not required it can be null + if (tenant == null) { throw new IllegalArgumentException("Tenant is not provided"); } @@ -1660,8 +1740,10 @@ public abstract class ComponentImpl { } } - protected void componentInstanceStatusRequestValidate (final String tenant, final String namespace, - final String componentName, int instanceId) { + protected void componentInstanceStatusRequestValidate (final String tenant, + final String namespace, + final String componentName, + final int instanceId) { componentStatusRequestValidate(tenant, namespace, componentName); FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 9a6f739..f3e41a1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -24,13 +24,11 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation; import org.apache.pulsar.common.policies.data.FunctionStatus; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.RestException; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; -import java.io.IOException; import java.net.URI; import java.util.Collection; import java.util.LinkedList; @@ -155,7 +153,10 @@ public class FunctionsImpl extends ComponentImpl { } @Override - public FunctionStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) { + public FunctionStatus getStatusExternal(final String tenant, + final String namespace, + final String name, + final int parallelism) { FunctionStatus functionStatus = new FunctionStatus(); for (int i = 0; i < parallelism; ++i) { FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData @@ -177,7 +178,7 @@ public class FunctionsImpl extends ComponentImpl { } @Override - public FunctionStatus emptyStatus(int parallelism) { + public FunctionStatus emptyStatus(final int parallelism) { FunctionStatus functionStatus = new FunctionStatus(); functionStatus.setNumInstances(parallelism); functionStatus.setNumRunning(0); @@ -209,8 +210,11 @@ public class FunctionsImpl extends ComponentImpl { * @param instanceId the function instance id * @return the function status */ - public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName, - final String instanceId, URI uri) throws IOException { + public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(final String tenant, + final String namespace, + final String componentName, + final String instanceId, + final URI uri) { // validate parameters componentInstanceStatusRequestValidate(tenant, namespace, componentName, Integer.parseInt(instanceId)); @@ -237,8 +241,10 @@ public class FunctionsImpl extends ComponentImpl { * @return a list of function statuses * @throws PulsarAdminException */ - public FunctionStatus getFunctionStatus(final String tenant, final String namespace, final String componentName, - URI uri) { + public FunctionStatus getFunctionStatus(final String tenant, + final String namespace, + final String componentName, + final URI uri) { // validate parameters componentStatusRequestValidate(tenant, namespace, componentName); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java index b94dfd8..9d7b316 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.functions.worker.rest.api; -import com.fasterxml.jackson.core.JsonProcessingException; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import org.apache.pulsar.common.util.SimpleTextOutputStream; @@ -38,7 +37,7 @@ public class FunctionsMetricsResource extends FunctionApiResource { @Path("metrics") @GET @Produces(MediaType.TEXT_PLAIN) - public Response getMetrics() throws JsonProcessingException { + public Response getMetrics() { WorkerService workerService = get(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java index 6d70fbb..862aaf6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java @@ -24,13 +24,11 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation; import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.RestException; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; -import java.io.IOException; import java.net.URI; import java.util.Collection; import java.util.LinkedList; @@ -123,13 +121,18 @@ public class SinkImpl extends ComponentImpl { } @Override - public SinkStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException { + public SinkStatus getStatus(final String tenant, + final String namespace, + final String name, + final Collection<Function.Assignment> assignments, + final URI uri) throws PulsarAdminException { SinkStatus sinkStatus = new SinkStatus(); for (Function.Assignment assignment : assignments) { boolean isOwner = worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId()); SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData; if (isOwner) { - sinkInstanceStatusData = getComponentInstanceStatus(tenant, namespace, name, assignment.getInstance().getInstanceId(), null); + sinkInstanceStatusData = getComponentInstanceStatus(tenant, + namespace, name, assignment.getInstance().getInstanceId(), null); } else { sinkInstanceStatusData = worker().getFunctionAdmin().sink().getSinkStatus( assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), @@ -154,7 +157,10 @@ public class SinkImpl extends ComponentImpl { } @Override - public SinkStatus getStatusExternal (String tenant, String namespace, String name, int parallelism) { + public SinkStatus getStatusExternal(final String tenant, + final String namespace, + final String name, + final int parallelism) { SinkStatus sinkStatus = new SinkStatus(); for (int i = 0; i < parallelism; ++i) { SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData @@ -176,7 +182,7 @@ public class SinkImpl extends ComponentImpl { } @Override - public SinkStatus emptyStatus(int parallelism) { + public SinkStatus emptyStatus(final int parallelism) { SinkStatus sinkStatus = new SinkStatus(); sinkStatus.setNumInstances(parallelism); sinkStatus.setNumRunning(0); @@ -200,9 +206,11 @@ public class SinkImpl extends ComponentImpl { super(workerServiceSupplier, ComponentType.SINK); } - public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus( - String tenant, String namespace, String sinkName, String instanceId, URI uri) - throws IOException { + public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant, + final String namespace, + final String sinkName, + final String instanceId, + final URI uri) { // validate parameters componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId)); @@ -221,9 +229,10 @@ public class SinkImpl extends ComponentImpl { return sinkInstanceStatusData; } - public SinkStatus getSinkStatus( - final String tenant, final String namespace, - final String componentName, URI uri) { + public SinkStatus getSinkStatus(final String tenant, + final String namespace, + final String componentName, + final URI uri) { // validate parameters componentStatusRequestValidate(tenant, namespace, componentName); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java index 8c3c988..d915bb0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java @@ -24,13 +24,11 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation; import org.apache.pulsar.common.policies.data.SourceStatus; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.RestException; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; -import java.io.IOException; import java.net.URI; import java.util.Collection; import java.util.LinkedList; @@ -122,7 +120,11 @@ public class SourceImpl extends ComponentImpl { } @Override - public SourceStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException { + public SourceStatus getStatus(final String tenant, + final String namespace, + final String name, + final Collection<Function.Assignment> assignments, + final URI uri) throws PulsarAdminException { SourceStatus sourceStatus = new SourceStatus(); for (Function.Assignment assignment : assignments) { boolean isOwner = worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId()); @@ -153,7 +155,10 @@ public class SourceImpl extends ComponentImpl { } @Override - public SourceStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) { + public SourceStatus getStatusExternal(final String tenant, + final String namespace, + final String name, + final int parallelism) { SourceStatus sinkStatus = new SourceStatus(); for (int i = 0; i < parallelism; ++i) { SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData @@ -175,7 +180,7 @@ public class SourceImpl extends ComponentImpl { } @Override - public SourceStatus emptyStatus(int parallelism) { + public SourceStatus emptyStatus(final int parallelism) { SourceStatus sourceStatus = new SourceStatus(); sourceStatus.setNumInstances(parallelism); sourceStatus.setNumRunning(0); @@ -199,8 +204,10 @@ public class SourceImpl extends ComponentImpl { super(workerServiceSupplier, ComponentType.SOURCE); } - public SourceStatus getSourceStatus(final String tenant, final String namespace, - final String componentName, URI uri) throws IOException { + public SourceStatus getSourceStatus(final String tenant, + final String namespace, + final String componentName, + final URI uri) { // validate parameters componentStatusRequestValidate(tenant, namespace, componentName); @@ -217,8 +224,11 @@ public class SourceImpl extends ComponentImpl { return sourceStatus; } - public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus( - String tenant, String namespace, String sourceName, String instanceId, URI uri) { + public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(final String tenant, + final String namespace, + final String sourceName, + final String instanceId, + final URI uri) { // validate parameters componentInstanceStatusRequestValidate(tenant, namespace, sourceName, Integer.parseInt(instanceId)); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index bafb7a9..56c945d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -27,14 +27,20 @@ import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.worker.*; +import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; +import org.apache.pulsar.functions.worker.FunctionRuntimeManager; +import org.apache.pulsar.functions.worker.MembershipManager; +import org.apache.pulsar.functions.worker.Utils; +import org.apache.pulsar.functions.worker.WorkerService; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; -import java.io.*; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkNotNull; @@ -118,11 +124,11 @@ public class WorkerImpl { .build(); } - public boolean isSuperUser(String clientRole) { + public boolean isSuperUser(final String clientRole) { return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole); } - public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(String clientRole) throws IOException { + public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(final String clientRole) { if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); throw new WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) @@ -135,12 +141,12 @@ public class WorkerImpl { if (!isWorkerServiceAvailable()) { throw new WebApplicationException( Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData("Function worker service is not avaialable")).build()); + .entity(new ErrorData("Function worker service is not available")).build()); } return worker().getMetricsGenerator().generate(); } - public Response getFunctionsMetrics(String clientRole) throws IOException { + public Response getFunctionsMetrics(final String clientRole) { if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole)) { log.error("Client [{}] is not admin and authorized to get function-stats", clientRole); return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) @@ -156,7 +162,7 @@ public class WorkerImpl { public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData metrics; } - private Response getFunctionsMetrics() throws IOException { + private Response getFunctionsMetrics() { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); } @@ -171,7 +177,8 @@ public class WorkerImpl { String fullyQualifiedInstanceName = entry.getKey(); FunctionRuntimeInfo functionRuntimeInfo = entry.getValue(); - FunctionStats.FunctionInstanceStats functionInstanceStats = Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo); + FunctionStats.FunctionInstanceStats functionInstanceStats = + Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo); WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats(); workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);