srkukarni closed pull request #2381: Make source and sink cli args format 
consistent
URL: https://github.com/apache/incubator-pulsar/pull/2381
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 5f52313d57..1a7d1e96e1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -268,7 +268,10 @@ void processArguments() throws Exception {
         protected String DEPRECATED_userConfigString;
         @Parameter(names = "--user-config", description = "User-defined config 
key/values")
         protected String userConfigString;
-        @Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order")
+        // for backwards compatibility purposes
+        @Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order", hidden = true)
+        protected Boolean DEPRECATED_retainOrdering;
+        @Parameter(names = "--retain-ordering", description = "Function 
consumes and processes messages in order")
         protected boolean retainOrdering;
         @Parameter(names = "--parallelism", description = "The function's 
parallelism factor (i.e. the number of function instances to run)")
         protected Integer parallelism;
@@ -319,8 +322,9 @@ private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_outputSerdeClassName)) 
outputSerdeClassName = DEPRECATED_outputSerdeClassName;
             if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) 
customSerdeInputString = DEPRECATED_customSerdeInputString;
             if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = 
DEPRECATED_fnConfigFile;
-            if (DEPRECATED_processingGuarantees != 
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = 
DEPRECATED_processingGuarantees;
+            if (DEPRECATED_processingGuarantees != null) processingGuarantees 
= DEPRECATED_processingGuarantees;
             if (!StringUtils.isBlank(DEPRECATED_userConfigString)) 
userConfigString = DEPRECATED_userConfigString;
+            if (DEPRECATED_retainOrdering != null) retainOrdering = 
DEPRECATED_retainOrdering;
             if (DEPRECATED_windowLengthCount != null) windowLengthCount = 
DEPRECATED_windowLengthCount;
             if (DEPRECATED_windowLengthDurationMs != null) 
windowLengthDurationMs = DEPRECATED_windowLengthDurationMs;
             if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount 
= DEPRECATED_slidingIntervalCount;
@@ -787,6 +791,7 @@ private void mergeArgs() {
 
         @Override
         void runCmd() throws Exception {
+            // merge deprecated args with new args
             mergeArgs();
             CmdFunctions.startLocalRun(convertProto2(functionConfig), 
functionConfig.getParallelism(),
                     instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
@@ -964,6 +969,7 @@ public void mergeArgs() {
 
         @Override
         void runCmd() throws Exception {
+            // merge deprecated args with new args
             mergeArgs();
             if (triggerFile == null && triggerValue == null) {
                 throw new ParameterException("Either a trigger value or a 
trigger filepath needs to be specified");
@@ -998,6 +1004,7 @@ private void mergeArgs() {
 
         @Override
         void runCmd() throws Exception {
+            // merge deprecated args with new args
             mergeArgs();
             if (StringUtils.isBlank(sourceFile)) {
                 throw new ParameterException("--source-file needs to be 
specified");
@@ -1032,6 +1039,7 @@ private void mergeArgs() {
 
         @Override
         void runCmd() throws Exception {
+            // merge deprecated args with new args
             mergeArgs();
             if (StringUtils.isBlank(destinationFile)) {
                 throw new ParameterException("--destination-file needs to be 
specified");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 8417874f38..5ccd9b0853 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -24,23 +24,9 @@
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -50,7 +36,6 @@
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
@@ -64,6 +49,20 @@
 import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
@@ -114,29 +113,62 @@ void processArguments() throws Exception {
     @Parameters(commandDescription = "Run a Pulsar IO sink connector locally 
(rather than deploying it to the Pulsar cluster)")
     protected class LocalSinkRunner extends CreateSink {
 
-        @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker")
+        // for backwards compatibility purposes
+        @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker", hidden = true)
+        protected String DEPRECATED_brokerServiceUrl;
+        @Parameter(names = "--broker-service-url", description = "The URL for 
the Pulsar broker")
         protected String brokerServiceUrl;
 
-        @Parameter(names = "--clientAuthPlugin", description = "Client 
authentication plugin using which function-process can connect to broker")
+        // for backwards compatibility purposes
+        @Parameter(names = "--clientAuthPlugin", description = "Client 
authentication plugin using which function-process can connect to broker", 
hidden = true)
+        protected String DEPRECATED_clientAuthPlugin;
+        @Parameter(names = "--client-auth-plugin", description = "Client 
authentication plugin using which function-process can connect to broker")
         protected String clientAuthPlugin;
 
-        @Parameter(names = "--clientAuthParams", description = "Client 
authentication param")
+        // for backwards compatibility purposes
+        @Parameter(names = "--clientAuthParams", description = "Client 
authentication param", hidden = true)
+        protected String DEPRECATED_clientAuthParams;
+        @Parameter(names = "--client-auth-params", description = "Client 
authentication param")
         protected String clientAuthParams;
 
-        @Parameter(names = "--use_tls", description = "Use tls connection\n")
+        // for backwards compatibility purposes
+        @Parameter(names = "--use_tls", description = "Use tls connection\n", 
hidden = true)
+        protected Boolean DEPRECATED_useTls;
+        @Parameter(names = "--use-tls", description = "Use tls connection\n")
         protected boolean useTls;
 
-        @Parameter(names = "--tls_allow_insecure", description = "Allow 
insecure tls connection\n")
+        // for backwards compatibility purposes
+        @Parameter(names = "--tls_allow_insecure", description = "Allow 
insecure tls connection\n", hidden = true)
+        protected Boolean DEPRECATED_tlsAllowInsecureConnection;
+        @Parameter(names = "--tls-allow-insecure", description = "Allow 
insecure tls connection\n")
         protected boolean tlsAllowInsecureConnection;
 
-        @Parameter(names = "--hostname_verification_enabled", description = 
"Enable hostname verification")
+        // for backwards compatibility purposes
+        @Parameter(names = "--hostname_verification_enabled", description = 
"Enable hostname verification", hidden = true)
+        protected Boolean DEPRECATED_tlsHostNameVerificationEnabled;
+        @Parameter(names = "--hostname-verification-enabled", description = 
"Enable hostname verification")
         protected boolean tlsHostNameVerificationEnabled;
 
-        @Parameter(names = "--tls_trust_cert_path", description = "tls trust 
cert file path")
+        // for backwards compatibility purposes
+        @Parameter(names = "--tls_trust_cert_path", description = "tls trust 
cert file path", hidden = true)
+        protected String DEPRECATED_tlsTrustCertFilePath;
+        @Parameter(names = "--tls-trust-cert-path", description = "tls trust 
cert file path")
         protected String tlsTrustCertFilePath;
 
+        private void mergeArgs() {
+            if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) 
brokerServiceUrl = DEPRECATED_brokerServiceUrl;
+            if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) 
clientAuthPlugin = DEPRECATED_clientAuthPlugin;
+            if (!StringUtils.isBlank(DEPRECATED_clientAuthParams)) 
clientAuthParams = DEPRECATED_clientAuthParams;
+            if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls;
+            if (DEPRECATED_tlsAllowInsecureConnection != null) 
tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection;
+            if (DEPRECATED_tlsHostNameVerificationEnabled != null) 
tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled;
+            if (!StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath)) 
tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
+        }
+
         @Override
         void runCmd() throws Exception {
+            // merge deprecated args with new args
+            mergeArgs();
             CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), 
sinkConfig.getParallelism(),
                     0, brokerServiceUrl, null,
                     
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
@@ -200,46 +232,91 @@ void runCmd() throws Exception {
         protected String namespace;
         @Parameter(names = "--name", description = "The sink's name")
         protected String name;
-
         @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's 
connector provider")
         protected String sinkType;
-
         @Parameter(names = "--inputs", description = "The sink's input topic 
or topics (multiple topics can be specified as a comma-separated list)")
         protected String inputs;
-        @Parameter(names = "--topicsPattern", description = "TopicsPattern to 
consume from list of topics under a namespace that match the pattern. [--input] 
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a 
pattern in --customSerdeInputs  (supported for java fun only)")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--topicsPattern", description = "TopicsPattern to 
consume from list of topics under a namespace that match the pattern. [--input] 
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a 
pattern in --customSerdeInputs  (supported for java fun only)", hidden = true)
+        protected String DEPRECATED_topicsPattern;
+        @Parameter(names = "--topics-pattern", description = "TopicsPattern to 
consume from list of topics under a namespace that match the pattern. [--input] 
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a 
pattern in --customSerdeInputs  (supported for java fun only)")
         protected String topicsPattern;
-        @Parameter(names = "--subsName", description = "Pulsar source 
subscription name if user wants a specific subscription-name for input-topic 
consumer")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--subsName", description = "Pulsar source 
subscription name if user wants a specific subscription-name for input-topic 
consumer", hidden = true)
+        protected String DEPRECATED_subsName;
+        @Parameter(names = "--subs-name", description = "Pulsar source 
subscription name if user wants a specific subscription-name for input-topic 
consumer")
         protected String subsName;
-        @Parameter(names = "--customSerdeInputs", description = "The map of 
input topics to SerDe class names (as a JSON string)")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--customSerdeInputs", description = "The map of 
input topics to SerDe class names (as a JSON string)", hidden = true)
+        protected String DEPRECATED_customSerdeInputString;
+        @Parameter(names = "--custom-serde-inputs", description = "The map of 
input topics to SerDe class names (as a JSON string)")
         protected String customSerdeInputString;
-        @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the sink")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the sink", hidden = 
true)
+        protected FunctionConfig.ProcessingGuarantees 
DEPRECATED_processingGuarantees;
+        @Parameter(names = "--processing-guarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the sink")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--retainOrdering", description = "Sink consumes 
and sinks messages in order")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--retainOrdering", description = "Sink consumes 
and sinks messages in order", hidden = true)
+        protected Boolean DEPRECATED_retainOrdering;
+        @Parameter(names = "--retain-ordering", description = "Sink consumes 
and sinks messages in order")
         protected boolean retainOrdering;
+
         @Parameter(names = "--parallelism", description = "The sink's 
parallelism factor (i.e. the number of sink instances to run)")
         protected Integer parallelism;
         @Parameter(names = {"-a", "--archive"}, description = "Path to the 
archive file for the sink. It also supports url-path [http/https/file (file 
protocol assumes that file already exists on worker host)] from which worker 
can download the package.", listConverter = StringConverter.class)
         protected String archive;
-        @Parameter(names = "--className", description = "The sink's class name 
if archive is file-url-path (file://)")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--className", description = "The sink's class name 
if archive is file-url-path (file://)", hidden = true)
+        protected String DEPRECATED_className;
+        @Parameter(names = "--classname", description = "The sink's class name 
if archive is file-url-path (file://)")
         protected String className;
 
+        // for backwards compatibility purposes
         @Parameter(names = "--sinkConfigFile", description = "The path to a 
YAML config file specifying the "
+                + "sink's configuration", hidden = true)
+        protected String DEPRECATED_sinkConfigFile;
+        @Parameter(names = "--sink-config-file", description = "The path to a 
YAML config file specifying the "
                 + "sink's configuration")
         protected String sinkConfigFile;
+
         @Parameter(names = "--cpu", description = "The CPU (in cores) that 
needs to be allocated per sink instance (applicable only to Docker runtime)")
         protected Double cpu;
         @Parameter(names = "--ram", description = "The RAM (in bytes) that 
need to be allocated per sink instance (applicable only to the process and 
Docker runtimes)")
         protected Long ram;
         @Parameter(names = "--disk", description = "The disk (in bytes) that 
need to be allocated per sink instance (applicable only to Docker runtime)")
         protected Long disk;
-        @Parameter(names = "--sinkConfig", description = "User defined configs 
key/values")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--sinkConfig", description = "User defined configs 
key/values", hidden = true)
+        protected String DEPRECATED_sinkConfigString;
+        @Parameter(names = "--sink-config", description = "User defined 
configs key/values")
         protected String sinkConfigString;
 
         protected SinkConfig sinkConfig;
 
+        private void mergeArgs() {
+            if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = 
DEPRECATED_subsName;
+            if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern 
= DEPRECATED_topicsPattern;
+            if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) 
customSerdeInputString = DEPRECATED_customSerdeInputString;
+            if (DEPRECATED_processingGuarantees != null) processingGuarantees 
= DEPRECATED_processingGuarantees;
+            if (DEPRECATED_retainOrdering != null) retainOrdering = 
DEPRECATED_retainOrdering;
+            if (!StringUtils.isBlank(DEPRECATED_className)) className = 
DEPRECATED_className;
+            if (!StringUtils.isBlank(DEPRECATED_sinkConfigFile)) 
sinkConfigFile = DEPRECATED_sinkConfigFile;
+            if (!StringUtils.isBlank(DEPRECATED_sinkConfigString)) 
sinkConfigString = DEPRECATED_sinkConfigString;
+        }
+
         @Override
         void processArguments() throws Exception {
             super.processArguments();
+            // merge deprecated args with new args
+            mergeArgs();
 
             if (null != sinkConfigFile) {
                 this.sinkConfig = CmdUtils.loadConfig(sinkConfigFile, 
SinkConfig.class);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index d050ab515c..8f72d0a69c 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -24,16 +24,6 @@
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -61,8 +51,10 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
@@ -115,38 +107,72 @@ void processArguments() throws Exception {
     @Parameters(commandDescription = "Run a Pulsar IO source connector locally 
(rather than deploying it to the Pulsar cluster)")
     protected class LocalSourceRunner extends CreateSource {
 
-        @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker")
+        // for backwards compatibility purposes
+        @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker", hidden = true)
+        protected String DEPRECATED_brokerServiceUrl;
+        @Parameter(names = "--broker-service-url", description = "The URL for 
the Pulsar broker")
         protected String brokerServiceUrl;
 
-        @Parameter(names = "--clientAuthPlugin", description = "Client 
authentication plugin using which function-process can connect to broker")
+        // for backwards compatibility purposes
+        @Parameter(names = "--clientAuthPlugin", description = "Client 
authentication plugin using which function-process can connect to broker", 
hidden = true)
+        protected String DEPRECATED_clientAuthPlugin;
+        @Parameter(names = "--client-auth-plugin", description = "Client 
authentication plugin using which function-process can connect to broker")
         protected String clientAuthPlugin;
 
-        @Parameter(names = "--clientAuthParams", description = "Client 
authentication param")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--clientAuthParams", description = "Client 
authentication param", hidden = true)
+        protected String DEPRECATED_clientAuthParams;
+        @Parameter(names = "--client-auth-params", description = "Client 
authentication param")
         protected String clientAuthParams;
 
-        @Parameter(names = "--use_tls", description = "Use tls connection\n")
+        // for backwards compatibility purposes
+        @Parameter(names = "--use_tls", description = "Use tls connection\n", 
hidden = true)
+        protected Boolean DEPRECATED_useTls;
+        @Parameter(names = "--use-tls", description = "Use tls connection\n")
         protected boolean useTls;
 
-        @Parameter(names = "--tls_allow_insecure", description = "Allow 
insecure tls connection\n")
+        // for backwards compatibility purposes
+        @Parameter(names = "--tls_allow_insecure", description = "Allow 
insecure tls connection\n", hidden = true)
+        protected Boolean DEPRECATED_tlsAllowInsecureConnection;
+        @Parameter(names = "--tls-allow-insecure", description = "Allow 
insecure tls connection\n")
         protected boolean tlsAllowInsecureConnection;
 
-        @Parameter(names = "--hostname_verification_enabled", description = 
"Enable hostname verification")
+        // for backwards compatibility purposes
+        @Parameter(names = "--hostname_verification_enabled", description = 
"Enable hostname verification", hidden = true)
+        protected Boolean DEPRECATED_tlsHostNameVerificationEnabled;
+        @Parameter(names = "--hostname-verification-enabled", description = 
"Enable hostname verification")
         protected boolean tlsHostNameVerificationEnabled;
 
-        @Parameter(names = "--tls_trust_cert_path", description = "tls trust 
cert file path")
+        // for backwards compatibility purposes
+        @Parameter(names = "--tls_trust_cert_path", description = "tls trust 
cert file path", hidden = true)
+        protected String DEPRECATED_tlsTrustCertFilePath;
+        @Parameter(names = "--tls-trust-cert-path", description = "tls trust 
cert file path")
         protected String tlsTrustCertFilePath;
 
+        private void mergeArgs() {
+            if 
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) 
brokerServiceUrl = DEPRECATED_brokerServiceUrl;
+            if 
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) 
clientAuthPlugin = DEPRECATED_clientAuthPlugin;
+            if 
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_clientAuthParams)) 
clientAuthParams = DEPRECATED_clientAuthParams;
+            if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls;
+            if (DEPRECATED_tlsAllowInsecureConnection != null) 
tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection;
+            if (DEPRECATED_tlsHostNameVerificationEnabled != null) 
tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled;
+            if 
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath)) 
tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
+        }
+
         @Override
         void runCmd() throws Exception {
-                
CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), 
sourceConfig.getParallelism(),
-                    0, brokerServiceUrl, null,
-                    
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
-                            
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
-                            
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
-                            
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
-                            
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                    sourceConfig.getArchive(), admin);
-        }
+            // merge deprecated args with new args
+            mergeArgs();
+            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), 
sourceConfig.getParallelism(),
+                0, brokerServiceUrl, null,
+                
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
+                        
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
+                        .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
+                        
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
+                        .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+                sourceConfig.getArchive(), admin);
+    }
 
         @Override
         protected String validateSourceType(String sourceType) throws 
IOException {
@@ -205,36 +231,73 @@ void runCmd() throws Exception {
         @Parameter(names = { "-t", "--source-type" }, description = "The 
source's connector provider")
         protected String sourceType;
 
-        @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the Source")
+        // for backwards compatibility purposes
+        @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the source", hidden = 
true)
+        protected FunctionConfig.ProcessingGuarantees 
DEPRECATED_processingGuarantees;
+        @Parameter(names = "--processing-guarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the source")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--destinationTopicName", description = "The Pulsar 
topic to which data is sent")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--destinationTopicName", description = "The Pulsar 
topic to which data is sent", hidden = true)
+        protected String DEPRECATED_destinationTopicName;
+        @Parameter(names = "--destination-topic-name", description = "The 
Pulsar topic to which data is sent")
         protected String destinationTopicName;
-        @Parameter(names = "--deserializationClassName", description = "The 
SerDe classname for the source")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--deserializationClassName", description = "The 
SerDe classname for the source", hidden = true)
+        protected String DEPRECATED_deserializationClassName;
+        @Parameter(names = "--deserialization-classname", description = "The 
SerDe classname for the source")
         protected String deserializationClassName;
+
         @Parameter(names = "--parallelism", description = "The source's 
parallelism factor (i.e. the number of source instances to run)")
         protected Integer parallelism;
         @Parameter(names = { "-a", "--archive" },
                 description = "The path to the NAR archive for the Source. It 
also supports url-path [http/https/file (file protocol assumes that file 
already exists on worker host)] from which worker can download the package.", 
listConverter = StringConverter.class)
         protected String archive;
-        @Parameter(names = "--className", description = "The source's class 
name if archive is file-url-path (file://)")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--className", description = "The source's class 
name if archive is file-url-path (file://)", hidden = true)
+        protected String DEPRECATED_className;
+        @Parameter(names = "--classname", description = "The source's class 
name if archive is file-url-path (file://)")
         protected String className;
+
+        // for backwards compatibility purposes
         @Parameter(names = "--sourceConfigFile", description = "The path to a 
YAML config file specifying the "
+                + "source's configuration", hidden = true)
+        protected String DEPRECATED_sourceConfigFile;
+        @Parameter(names = "--source-config-file", description = "The path to 
a YAML config file specifying the "
                 + "source's configuration")
         protected String sourceConfigFile;
+
         @Parameter(names = "--cpu", description = "The CPU (in cores) that 
needs to be allocated per source instance (applicable only to Docker runtime)")
         protected Double cpu;
         @Parameter(names = "--ram", description = "The RAM (in bytes) that 
need to be allocated per source instance (applicable only to the process and 
Docker runtimes)")
         protected Long ram;
         @Parameter(names = "--disk", description = "The disk (in bytes) that 
need to be allocated per source instance (applicable only to Docker runtime)")
         protected Long disk;
-        @Parameter(names = "--sourceConfig", description = "Source config 
key/values")
+
+        // for backwards compatibility purposes
+        @Parameter(names = "--sourceConfig", description = "Source config 
key/values", hidden = true)
+        protected String DEPRECATED_sourceConfigString;
+        @Parameter(names = "--source-config", description = "Source config 
key/values")
         protected String sourceConfigString;
 
         protected SourceConfig sourceConfig;
 
+        private void mergeArgs() {
+            if (DEPRECATED_processingGuarantees != null) processingGuarantees 
= DEPRECATED_processingGuarantees;
+            if 
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_destinationTopicName)) 
destinationTopicName = DEPRECATED_destinationTopicName;
+            if 
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_deserializationClassName))
 deserializationClassName = DEPRECATED_deserializationClassName;
+            if 
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_className)) className 
= DEPRECATED_className;
+            if 
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_sourceConfigFile)) 
sourceConfigFile = DEPRECATED_sourceConfigFile;
+            if 
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_sourceConfigString)) 
sourceConfigString = DEPRECATED_sourceConfigString;
+        }
+
         @Override
         void processArguments() throws Exception {
             super.processArguments();
+            // merge deprecated args with new args
+            mergeArgs();
 
             if (null != sourceConfigFile) {
                 this.sourceConfig = CmdUtils.loadConfig(sourceConfigFile, 
SourceConfig.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to