This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 17a7d91 Seperate cmd line connector interface to explicit source and sink (#1745) 17a7d91 is described below commit 17a7d91ab5b73cfbce1ae775dfc6312e88896287 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue May 8 02:05:33 2018 -0700 Seperate cmd line connector interface to explicit source and sink (#1745) --- .../cli/{CmdConnectors.java => CmdSinks.java} | 245 ++------------------ .../cli/{CmdConnectors.java => CmdSources.java} | 257 ++------------------- .../apache/pulsar/admin/cli/PulsarAdminTool.java | 3 +- 3 files changed, 39 insertions(+), 466 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java similarity index 57% copy from pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java copy to pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 0492d76..01ec16c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -27,6 +27,7 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.naming.TopicName; @@ -35,16 +36,12 @@ import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.shaded.proto.Function; import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec; import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec; -import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; -import org.apache.pulsar.functions.utils.FunctionConfig; -import org.apache.pulsar.functions.utils.SinkConfig; -import org.apache.pulsar.functions.utils.SourceConfig; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.source.PulsarSource; -import org.apache.pulsar.functions.utils.Reflections; -import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.functions.utils.*; import java.io.File; import java.io.IOException; @@ -56,32 +53,24 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; -import net.jodah.typetools.TypeResolver; - @Slf4j @Getter -@Parameters(commandDescription = "Interface for managing Pulsar Connectors (Ingress and egress data to and from Pulsar)") -public class CmdConnectors extends CmdBase { +@Parameters(commandDescription = "Interface for managing Pulsar Sinks (Egress data from Pulsar)") +public class CmdSinks extends CmdBase { - private final CreateSource createSource; private final CreateSink createSink; - private final DeleteConnector deleteConnector; - private final LocalSourceRunner localSourceRunner; + private final DeleteSink deleteSink; private final LocalSinkRunner localSinkRunner; - public CmdConnectors(PulsarAdmin admin) { - super("connectors", admin); - createSource = new CreateSource(); + public CmdSinks(PulsarAdmin admin) { + super("sink", admin); createSink = new CreateSink(); - deleteConnector = new DeleteConnector(); - localSourceRunner = new LocalSourceRunner(); + deleteSink = new DeleteSink(); localSinkRunner = new LocalSinkRunner(); - jcommander.addCommand("create-source", createSource); - jcommander.addCommand("create-sink", createSink); - jcommander.addCommand("delete", deleteConnector); - jcommander.addCommand("localrun-source", localSourceRunner); - jcommander.addCommand("localrun-sink", localSinkRunner); + jcommander.addCommand("create", createSink); + jcommander.addCommand("delete", deleteSink); + jcommander.addCommand("localrun", localSinkRunner); } /** @@ -101,20 +90,7 @@ public class CmdConnectors extends CmdBase { abstract void runCmd() throws Exception; } - @Parameters(commandDescription = "Run the Pulsar source or sink locally (rather than deploying it to the Pulsar cluster)") - - class LocalSourceRunner extends CreateSource { - - @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") - protected String brokerServiceUrl; - - @Override - void runCmd() throws Exception { - CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), - sourceConfig.getParallelism(), brokerServiceUrl, jarFile, admin); - } - } - + @Parameters(commandDescription = "Run the Pulsar sink locally (rather than deploying it to the Pulsar cluster)") class LocalSinkRunner extends CreateSink { @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") @@ -127,173 +103,6 @@ public class CmdConnectors extends CmdBase { } } - @Parameters(commandDescription = "Create Pulsar source connectors") - class CreateSource extends BaseCommand { - @Parameter(names = "--tenant", description = "The source's tenant") - protected String tenant; - @Parameter(names = "--namespace", description = "The source's namespace") - protected String namespace; - @Parameter(names = "--name", description = "The source's name") - protected String name; - @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Source") - protected FunctionConfig.ProcessingGuarantees processingGuarantees; - @Parameter(names = "--className", description = "The source's class name") - protected String className; - @Parameter(names = "--destinationTopicName", description = "Pulsar topic to ingress data to") - protected String destinationTopicName; - @Parameter(names = "--deserializationClassName", description = "The classname for SerDe class for the source") - protected String deserializationClassName; - @Parameter(names = "--parallelism", description = "Number of instances of the source") - protected String parallelism; - @Parameter( - names = "--jar", - description = "Path to the jar file for the Source", - listConverter = StringConverter.class) - protected String jarFile; - - @Parameter(names = "--sourceConfigFile", description = "The path to a YAML config file specifying the " - + "source's configuration") - protected String sourceConfigFile; - - protected SourceConfig sourceConfig; - - @Override - void processArguments() throws Exception { - super.processArguments(); - - if (null != sourceConfigFile) { - this.sourceConfig = loadSourceConfig(sourceConfigFile); - } else { - this.sourceConfig = new SourceConfig(); - } - - if (null != tenant) { - sourceConfig.setTenant(tenant); - } - if (null != namespace) { - sourceConfig.setNamespace(namespace); - } - if (null != name) { - sourceConfig.setName(name); - } - - if (null != className) { - this.sourceConfig.setClassName(className); - } - if (null != destinationTopicName) { - sourceConfig.setTopicName(destinationTopicName); - } - if (null != deserializationClassName) { - sourceConfig.setSerdeClassName(deserializationClassName); - } - if (null != processingGuarantees) { - sourceConfig.setProcessingGuarantees(processingGuarantees); - } - if (parallelism == null) { - if (sourceConfig.getParallelism() == 0) { - sourceConfig.setParallelism(1); - } - } else { - int num = Integer.parseInt(parallelism); - if (num <= 0) { - throw new IllegalArgumentException("The parallelism factor (the number of instances) for the " - + "connector must be positive"); - } - sourceConfig.setParallelism(num); - } - - if (null == jarFile) { - throw new IllegalArgumentException("Connector JAR not specfied"); - } - } - - @Override - void runCmd() throws Exception { - if (!areAllRequiredFieldsPresentForSource(sourceConfig)) { - throw new RuntimeException("Missing arguments"); - } - admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile); - print("Created successfully"); - } - - private Class<?> getSourceType(File file) { - if (!Reflections.classExistsInJar(file, sourceConfig.getClassName())) { - throw new IllegalArgumentException(String.format("Pulsar Source class %s does not exist in jar %s", - sourceConfig.getClassName(), jarFile)); - } else if (!Reflections.classInJarImplementsIface(file, sourceConfig.getClassName(), Source.class)) { - throw new IllegalArgumentException(String.format("The Pulsar source class %s in jar %s implements does not implement " + Source.class.getName(), - sourceConfig.getClassName(), jarFile)); - } - - Object userClass = Reflections.createInstance(sourceConfig.getClassName(), file); - Class<?> typeArg; - Source source = (Source) userClass; - if (source == null) { - throw new IllegalArgumentException(String.format("The Pulsar source class %s could not be instantiated from jar %s", - sourceConfig.getClassName(), jarFile)); - } - typeArg = TypeResolver.resolveRawArgument(Source.class, source.getClass()); - - return typeArg; - } - - protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSourceConfigProto2(SourceConfig sourceConfig) - throws IOException { - org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder - = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(createSourceConfig(sourceConfig)), functionDetailsBuilder); - return functionDetailsBuilder.build(); - } - - protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) { - - File file = new File(jarFile); - try { - Reflections.loadJar(file); - } catch (MalformedURLException e) { - throw new RuntimeException("Failed to load user jar " + file, e); - } - Class<?> typeArg = getSourceType(file); - - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - if (sourceConfig.getTenant() != null) { - functionDetailsBuilder.setTenant(sourceConfig.getTenant()); - } - if (sourceConfig.getNamespace() != null) { - functionDetailsBuilder.setNamespace(sourceConfig.getNamespace()); - } - if (sourceConfig.getName() != null) { - functionDetailsBuilder.setName(sourceConfig.getName()); - } - functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); - functionDetailsBuilder.setParallelism(sourceConfig.getParallelism()); - functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); - if (sourceConfig.getProcessingGuarantees() != null) { - functionDetailsBuilder.setProcessingGuarantees( - convertProcessingGuarantee(sourceConfig.getProcessingGuarantees())); - } - - // set source spec - SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - sourceSpecBuilder.setClassName(sourceConfig.getClassName()); - sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs())); - sourceSpecBuilder.setTypeClassName(typeArg.getName()); - functionDetailsBuilder.setSource(sourceSpecBuilder); - - // set up sink spec - SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - sinkSpecBuilder.setClassName(PulsarSink.class.getName()); - if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) { - sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName()); - } - sinkSpecBuilder.setTopic(sourceConfig.getTopicName()); - sinkSpecBuilder.setTypeClassName(typeArg.getName()); - - functionDetailsBuilder.setSink(sinkSpecBuilder); - return functionDetailsBuilder.build(); - } - } - @Parameters(commandDescription = "Create Pulsar sink connectors") class CreateSink extends BaseCommand { @Parameter(names = "--tenant", description = "The sink's tenant") @@ -356,7 +165,7 @@ public class CmdConnectors extends CmdBase { inputTopics.forEach(new Consumer<String>() { @Override public void accept(String s) { - CmdConnectors.validateTopicName(s); + CmdSinks.validateTopicName(s); topicsToSerDeClassName.put(s, ""); } }); @@ -365,7 +174,7 @@ public class CmdConnectors extends CmdBase { Type type = new TypeToken<Map<String, String>>(){}.getType(); Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type); customSerdeInputMap.forEach((topic, serde) -> { - CmdConnectors.validateTopicName(topic); + CmdSinks.validateTopicName(topic); topicsToSerDeClassName.put(topic, serde); }); } @@ -475,15 +284,15 @@ public class CmdConnectors extends CmdBase { } @Parameters(commandDescription = "Stops a Pulsar sink or source") - class DeleteConnector extends BaseCommand { + class DeleteSink extends BaseCommand { - @Parameter(names = "--tenant", description = "The tenant of a sink or source") + @Parameter(names = "--tenant", description = "The tenant of the sink") protected String tenant; - @Parameter(names = "--namespace", description = "The namespace of a sink or source") + @Parameter(names = "--namespace", description = "The namespace of the sink") protected String namespace; - @Parameter(names = "--name", description = "The name of a sink or source") + @Parameter(names = "--name", description = "The name of the sink") protected String name; @Override @@ -491,7 +300,7 @@ public class CmdConnectors extends CmdBase { super.processArguments(); if (null == tenant || null == namespace || null == name) { throw new RuntimeException( - "You must specify a tenant, namespace, and name for the sink or source"); + "You must specify a tenant, namespace, and name for the sink"); } } @@ -506,25 +315,11 @@ public class CmdConnectors extends CmdBase { return (SinkConfig) loadConfig(file, SinkConfig.class); } - private static SourceConfig loadSourceConfig(String file) throws IOException { - return (SourceConfig) loadConfig(file, SourceConfig.class); - } - private static Object loadConfig(String file, Class<?> clazz) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(file), clazz); } - public static boolean areAllRequiredFieldsPresentForSource(SourceConfig sourceConfig) { - return sourceConfig.getTenant() != null && !sourceConfig.getTenant().isEmpty() - && sourceConfig.getNamespace() != null && !sourceConfig.getNamespace().isEmpty() - && sourceConfig.getName() != null && !sourceConfig.getName().isEmpty() - && sourceConfig.getClassName() != null && !sourceConfig.getClassName().isEmpty() - && sourceConfig.getTopicName() != null && !sourceConfig.getTopicName().isEmpty() - || sourceConfig.getSerdeClassName() != null - && sourceConfig.getParallelism() > 0; - } - public static boolean areAllRequiredFieldsPresentForSink(SinkConfig sinkConfig) { return sinkConfig.getTenant() != null && !sinkConfig.getTenant().isEmpty() && sinkConfig.getNamespace() != null && !sinkConfig.getNamespace().isEmpty() diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java similarity index 54% rename from pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java rename to pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 0492d76..c651e37 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -27,6 +27,7 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.naming.TopicName; @@ -35,16 +36,12 @@ import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.shaded.proto.Function; import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec; import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec; -import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; -import org.apache.pulsar.functions.utils.FunctionConfig; -import org.apache.pulsar.functions.utils.SinkConfig; -import org.apache.pulsar.functions.utils.SourceConfig; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.source.PulsarSource; -import org.apache.pulsar.functions.utils.Reflections; -import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.functions.utils.*; import java.io.File; import java.io.IOException; @@ -56,32 +53,24 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; -import net.jodah.typetools.TypeResolver; - @Slf4j @Getter -@Parameters(commandDescription = "Interface for managing Pulsar Connectors (Ingress and egress data to and from Pulsar)") -public class CmdConnectors extends CmdBase { +@Parameters(commandDescription = "Interface for managing Pulsar Source (Ingress data to Pulsar)") +public class CmdSources extends CmdBase { private final CreateSource createSource; - private final CreateSink createSink; - private final DeleteConnector deleteConnector; + private final DeleteSource deleteSource; private final LocalSourceRunner localSourceRunner; - private final LocalSinkRunner localSinkRunner; - public CmdConnectors(PulsarAdmin admin) { - super("connectors", admin); + public CmdSources(PulsarAdmin admin) { + super("source", admin); createSource = new CreateSource(); - createSink = new CreateSink(); - deleteConnector = new DeleteConnector(); + deleteSource = new DeleteSource(); localSourceRunner = new LocalSourceRunner(); - localSinkRunner = new LocalSinkRunner(); - jcommander.addCommand("create-source", createSource); - jcommander.addCommand("create-sink", createSink); - jcommander.addCommand("delete", deleteConnector); - jcommander.addCommand("localrun-source", localSourceRunner); - jcommander.addCommand("localrun-sink", localSinkRunner); + jcommander.addCommand("create", createSource); + jcommander.addCommand("delete", deleteSource); + jcommander.addCommand("localrun", localSourceRunner); } /** @@ -101,8 +90,7 @@ public class CmdConnectors extends CmdBase { abstract void runCmd() throws Exception; } - @Parameters(commandDescription = "Run the Pulsar source or sink locally (rather than deploying it to the Pulsar cluster)") - + @Parameters(commandDescription = "Run the Pulsar source locally (rather than deploying it to the Pulsar cluster)") class LocalSourceRunner extends CreateSource { @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") @@ -115,18 +103,6 @@ public class CmdConnectors extends CmdBase { } } - class LocalSinkRunner extends CreateSink { - - @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") - protected String brokerServiceUrl; - - @Override - void runCmd() throws Exception { - CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), - sinkConfig.getParallelism(), brokerServiceUrl, jarFile, admin); - } - } - @Parameters(commandDescription = "Create Pulsar source connectors") class CreateSource extends BaseCommand { @Parameter(names = "--tenant", description = "The source's tenant") @@ -294,188 +270,8 @@ public class CmdConnectors extends CmdBase { } } - @Parameters(commandDescription = "Create Pulsar sink connectors") - class CreateSink extends BaseCommand { - @Parameter(names = "--tenant", description = "The sink's tenant") - protected String tenant; - @Parameter(names = "--namespace", description = "The sink's namespace") - protected String namespace; - @Parameter(names = "--name", description = "The sink's name") - protected String name; - @Parameter(names = "--className", description = "The sink's class name") - protected String className; - @Parameter(names = "--inputs", description = "The sink's input topic or topics (multiple topics can be specified as a comma-separated list)") - protected String inputs; - @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)") - protected String customSerdeInputString; - @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Sink") - protected FunctionConfig.ProcessingGuarantees processingGuarantees; - @Parameter(names = "--parallelism", description = "") - protected String parallelism; - @Parameter( - names = "--jar", - description = "Path to the jar file for the sink", - listConverter = StringConverter.class) - protected String jarFile; - - @Parameter(names = "--sinkConfigFile", description = "The path to a YAML config file specifying the " - + "sink's configuration") - protected String sinkConfigFile; - - protected SinkConfig sinkConfig; - - @Override - void processArguments() throws Exception { - super.processArguments(); - - if (null != sinkConfigFile) { - this.sinkConfig = loadSinkConfig(sinkConfigFile); - } else { - this.sinkConfig = new SinkConfig(); - } - - if (null != tenant) { - sinkConfig.setTenant(tenant); - } - if (null != namespace) { - sinkConfig.setNamespace(namespace); - } - if (null != name) { - sinkConfig.setName(name); - } - - if (null != className) { - sinkConfig.setClassName(className); - } - if (null != processingGuarantees) { - sinkConfig.setProcessingGuarantees(processingGuarantees); - } - Map<String, String> topicsToSerDeClassName = new HashMap<>(); - if (null != inputs) { - List<String> inputTopics = Arrays.asList(inputs.split(",")); - inputTopics.forEach(new Consumer<String>() { - @Override - public void accept(String s) { - CmdConnectors.validateTopicName(s); - topicsToSerDeClassName.put(s, ""); - } - }); - } - if (null != customSerdeInputString) { - Type type = new TypeToken<Map<String, String>>(){}.getType(); - Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type); - customSerdeInputMap.forEach((topic, serde) -> { - CmdConnectors.validateTopicName(topic); - topicsToSerDeClassName.put(topic, serde); - }); - } - sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); - - if (parallelism == null) { - if (sinkConfig.getParallelism() == 0) { - sinkConfig.setParallelism(1); - } - } else { - int num = Integer.parseInt(parallelism); - if (num <= 0) { - throw new IllegalArgumentException("The parallelism factor (the number of instances) for the " - + "connector must be positive"); - } - sinkConfig.setParallelism(num); - } - - if (null == jarFile) { - throw new IllegalArgumentException("Connector JAR not specfied"); - } - } - - @Override - void runCmd() throws Exception { - log.info("sinkConfig: {}", sinkConfig); - if (!areAllRequiredFieldsPresentForSink(sinkConfig)) { - throw new RuntimeException("Missing arguments"); - } - admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile); - print("Created successfully"); - } - - private Class<?> getSinkType(File file) { - if (!Reflections.classExistsInJar(file, sinkConfig.getClassName())) { - throw new IllegalArgumentException(String.format("Pulsar sink class %s does not exist in jar %s", - sinkConfig.getClassName(), jarFile)); - } else if (!Reflections.classInJarImplementsIface(file, sinkConfig.getClassName(), Sink.class)) { - throw new IllegalArgumentException(String.format("The Pulsar sink class %s in jar %s implements " + Sink.class.getName(), - sinkConfig.getClassName(), jarFile)); - } - - Object userClass = Reflections.createInstance(sinkConfig.getClassName(), file); - Class<?> typeArg; - Sink sink = (Sink) userClass; - if (sink == null) { - throw new IllegalArgumentException(String.format("The Pulsar sink class %s could not be instantiated from jar %s", - sinkConfig.getClassName(), jarFile)); - } - typeArg = TypeResolver.resolveRawArgument(Sink.class, sink.getClass()); - - return typeArg; - } - - protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSinkConfigProto2(SinkConfig sinkConfig) - throws IOException { - org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder - = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(createSinkConfig(sinkConfig)), functionDetailsBuilder); - return functionDetailsBuilder.build(); - } - - protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) { - - File file = new File(jarFile); - try { - Reflections.loadJar(file); - } catch (MalformedURLException e) { - throw new RuntimeException("Failed to load user jar " + file, e); - } - Class<?> typeArg = getSinkType(file); - - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - if (sinkConfig.getTenant() != null) { - functionDetailsBuilder.setTenant(sinkConfig.getTenant()); - } - if (sinkConfig.getNamespace() != null) { - functionDetailsBuilder.setNamespace(sinkConfig.getNamespace()); - } - if (sinkConfig.getName() != null) { - functionDetailsBuilder.setName(sinkConfig.getName()); - } - functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); - functionDetailsBuilder.setParallelism(sinkConfig.getParallelism()); - functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); - if (sinkConfig.getProcessingGuarantees() != null) { - functionDetailsBuilder.setProcessingGuarantees( - convertProcessingGuarantee(sinkConfig.getProcessingGuarantees())); - } - - // set source spec - SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - sourceSpecBuilder.setClassName(PulsarSource.class.getName()); - sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); - sourceSpecBuilder.putAllTopicsToSerDeClassName(sinkConfig.getTopicToSerdeClassName()); - sourceSpecBuilder.setTypeClassName(typeArg.getName()); - functionDetailsBuilder.setSource(sourceSpecBuilder); - - // set up sink spec - SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - sinkSpecBuilder.setClassName(sinkConfig.getClassName()); - sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs())); - sinkSpecBuilder.setTypeClassName(typeArg.getName()); - functionDetailsBuilder.setSink(sinkSpecBuilder); - return functionDetailsBuilder.build(); - } - } - - @Parameters(commandDescription = "Stops a Pulsar sink or source") - class DeleteConnector extends BaseCommand { + @Parameters(commandDescription = "Stops a Pulsar source") + class DeleteSource extends BaseCommand { @Parameter(names = "--tenant", description = "The tenant of a sink or source") protected String tenant; @@ -491,21 +287,17 @@ public class CmdConnectors extends CmdBase { super.processArguments(); if (null == tenant || null == namespace || null == name) { throw new RuntimeException( - "You must specify a tenant, namespace, and name for the sink or source"); + "You must specify a tenant, namespace, and name for the source"); } } @Override void runCmd() throws Exception { admin.functions().deleteFunction(tenant, namespace, name); - print("Deleted successfully"); + print("Delete source successfully"); } } - private static SinkConfig loadSinkConfig(String file) throws IOException { - return (SinkConfig) loadConfig(file, SinkConfig.class); - } - private static SourceConfig loadSourceConfig(String file) throws IOException { return (SourceConfig) loadConfig(file, SourceConfig.class); } @@ -525,21 +317,6 @@ public class CmdConnectors extends CmdBase { && sourceConfig.getParallelism() > 0; } - public static boolean areAllRequiredFieldsPresentForSink(SinkConfig sinkConfig) { - return sinkConfig.getTenant() != null && !sinkConfig.getTenant().isEmpty() - && sinkConfig.getNamespace() != null && !sinkConfig.getNamespace().isEmpty() - && sinkConfig.getName() != null && !sinkConfig.getName().isEmpty() - && sinkConfig.getClassName() != null && !sinkConfig.getClassName().isEmpty() - && sinkConfig.getTopicToSerdeClassName() != null && !sinkConfig.getTopicToSerdeClassName().isEmpty() - && sinkConfig.getParallelism() > 0; - } - - private static void validateTopicName(String topic) { - if (!TopicName.isValid(topic)) { - throw new IllegalArgumentException(String.format("The topic name %s is invalid", topic)); - } - } - private static ProcessingGuarantees convertProcessingGuarantee( FunctionConfig.ProcessingGuarantees processingGuarantees) { for (ProcessingGuarantees type : ProcessingGuarantees.values()) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java index c0ee2cb..6467072 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java @@ -96,7 +96,8 @@ public class PulsarAdminTool { commandMap.put("resource-quotas", CmdResourceQuotas.class); commandMap.put("functions", CmdFunctions.class); - commandMap.put("connectors", CmdConnectors.class); + commandMap.put("source", CmdSources.class); + commandMap.put("sink", CmdSinks.class); } private void setupCommands(Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) { -- To stop receiving notification emails like this one, please contact si...@apache.org.