This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7f97bde adding update functionality to sources and sinks (#1813) 7f97bde is described below commit 7f97bde2a384ae00bfbfd1c9f54deb3db124a1ad Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon May 21 00:52:58 2018 -0700 adding update functionality to sources and sinks (#1813) --- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 41 +++++++++++++++------- .../org/apache/pulsar/admin/cli/CmdSources.java | 39 ++++++++++++++------ 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 04e73bf..ed1cfa7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -45,7 +45,6 @@ import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.SinkConfig; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.Sink; -import org.apache.pulsar.io.core.Source; import java.io.File; import java.io.IOException; @@ -63,16 +62,19 @@ import java.util.function.Consumer; public class CmdSinks extends CmdBase { private final CreateSink createSink; + private final UpdateSink updateSink; private final DeleteSink deleteSink; private final LocalSinkRunner localSinkRunner; public CmdSinks(PulsarAdmin admin) { super("sink", admin); createSink = new CreateSink(); + updateSink = new UpdateSink(); deleteSink = new DeleteSink(); localSinkRunner = new LocalSinkRunner(); jcommander.addCommand("create", createSink); + jcommander.addCommand("update", updateSink); jcommander.addCommand("delete", deleteSink); jcommander.addCommand("localrun", localSinkRunner); } @@ -108,7 +110,31 @@ public class CmdSinks extends CmdBase { } @Parameters(commandDescription = "Create Pulsar sink connectors") - class CreateSink extends BaseCommand { + class CreateSink extends SinkCommand { + @Override + void runCmd() throws Exception { + if (!areAllRequiredFieldsPresentForSink(sinkConfig)) { + throw new RuntimeException("Missing arguments"); + } + admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile); + print("Created successfully"); + } + } + + @Parameters(commandDescription = "Update Pulsar sink connectors") + class UpdateSink extends SinkCommand { + @Override + void runCmd() throws Exception { + if (!areAllRequiredFieldsPresentForSink(sinkConfig)) { + throw new RuntimeException("Missing arguments"); + } + admin.functions().updateFunction(createSinkConfig(sinkConfig), jarFile); + print("Updated successfully"); + } + } + + @Parameters(commandDescription = "Create Pulsar sink connectors") + abstract class SinkCommand extends BaseCommand { @Parameter(names = "--tenant", description = "The sink's tenant") protected String tenant; @Parameter(names = "--namespace", description = "The sink's namespace") @@ -123,7 +149,7 @@ public class CmdSinks extends CmdBase { protected String customSerdeInputString; @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Sink") protected FunctionConfig.ProcessingGuarantees processingGuarantees; - @Parameter(names = "--parallelism", description = "") + @Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)") protected String parallelism; @Parameter( names = "--jar", @@ -221,15 +247,6 @@ public class CmdSinks extends CmdBase { } } - @Override - void runCmd() throws Exception { - if (!areAllRequiredFieldsPresentForSink(sinkConfig)) { - throw new RuntimeException("Missing arguments"); - } - admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile); - print("Created successfully"); - } - private Class<?> getSinkType(File file) { if (!Reflections.classExistsInJar(file, sinkConfig.getClassName())) { throw new IllegalArgumentException(String.format("Pulsar sink class %s does not exist in jar %s", diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 1583589..dbcdef7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -64,15 +64,18 @@ public class CmdSources extends CmdBase { private final CreateSource createSource; private final DeleteSource deleteSource; + private final UpdateSource updateSource; private final LocalSourceRunner localSourceRunner; public CmdSources(PulsarAdmin admin) { super("source", admin); createSource = new CreateSource(); + updateSource = new UpdateSource(); deleteSource = new DeleteSource(); localSourceRunner = new LocalSourceRunner(); jcommander.addCommand("create", createSource); + jcommander.addCommand("update", updateSource); jcommander.addCommand("delete", deleteSource); jcommander.addCommand("localrun", localSourceRunner); } @@ -108,7 +111,30 @@ public class CmdSources extends CmdBase { } @Parameters(commandDescription = "Create Pulsar source connectors") - class CreateSource extends BaseCommand { + public class CreateSource extends SourceCommand { + @Override + void runCmd() throws Exception { + if (!areAllRequiredFieldsPresentForSource(sourceConfig)) { + throw new RuntimeException("Missing arguments"); + } + admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile); + print("Created successfully"); + } + } + + @Parameters(commandDescription = "Update Pulsar source connectors") + public class UpdateSource extends SourceCommand { + @Override + void runCmd() throws Exception { + if (!areAllRequiredFieldsPresentForSource(sourceConfig)) { + throw new RuntimeException("Missing arguments"); + } + admin.functions().updateFunction(createSourceConfig(sourceConfig), jarFile); + print("Updated successfully"); + } + } + + abstract class SourceCommand extends BaseCommand { @Parameter(names = "--tenant", description = "The source's tenant") protected String tenant; @Parameter(names = "--namespace", description = "The source's namespace") @@ -123,7 +149,7 @@ public class CmdSources extends CmdBase { protected String destinationTopicName; @Parameter(names = "--deserializationClassName", description = "The classname for SerDe class for the source") protected String deserializationClassName; - @Parameter(names = "--parallelism", description = "Number of instances of the source") + @Parameter(names = "--parallelism", description = "The source's parallelism factor (i.e. the number of source instances to run)") protected String parallelism; @Parameter( names = "--jar", @@ -206,15 +232,6 @@ public class CmdSources extends CmdBase { } } - @Override - void runCmd() throws Exception { - if (!areAllRequiredFieldsPresentForSource(sourceConfig)) { - throw new RuntimeException("Missing arguments"); - } - admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile); - print("Created successfully"); - } - private Class<?> getSourceType(File file) { if (!Reflections.classExistsInJar(file, sourceConfig.getClassName())) { throw new IllegalArgumentException(String.format("Pulsar Source class %s does not exist in jar %s", -- To stop receiving notification emails like this one, please contact si...@apache.org.