rdhabalia closed pull request #2503: add auto ack and timeout configurable
URL: https://github.com/apache/incubator-pulsar/pull/2503
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 38a55bf47c..8f9eefea94 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -291,6 +291,10 @@ void runCmd() throws Exception {
         protected String DEPRECATED_sinkConfigString;
         @Parameter(names = "--sink-config", description = "User defined 
configs key/values")
         protected String sinkConfigString;
+        @Parameter(names = "--auto-ack", description = "Whether or not the 
framework will automatically acknowleges messages", arity = 1)
+        protected boolean autoAck = true;
+        @Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
+        protected Long timeoutMs;
 
         protected SinkConfig sinkConfig;
 
@@ -399,6 +403,15 @@ void processArguments() throws Exception {
                 sinkConfig.setConfigs(parseConfigs(sinkConfigString));
             }
 
+            sinkConfig.setAutoAck(autoAck);
+            if (timeoutMs != null) {
+                sinkConfig.setTimeoutMs(timeoutMs);
+            }
+            
+            if (null != sinkConfigString) {
+                sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+            }
+            
             inferMissingArguments(sinkConfig);
         }
 
@@ -585,7 +598,11 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
                             : SubscriptionType.SHARED;
             sourceSpecBuilder.setSubscriptionType(subType);
 
-            functionDetailsBuilder.setAutoAck(true);
+            functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
+            if (sinkConfig.getTimeoutMs() != null) {
+                sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
+            }
+            
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
             // set up sink spec
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index e3ba70e025..fb13d39913 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -133,6 +133,7 @@ public SinkConfig getSinkConfig() {
         sinkConfig.setTenant(TENANT);
         sinkConfig.setNamespace(NAMESPACE);
         sinkConfig.setName(NAME);
+        sinkConfig.setAutoAck(true);
 
         sinkConfig.setInputs(INPUTS_LIST);
         sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP);
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index be886c41ae..1132fa6b86 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -78,6 +78,9 @@
     private boolean retainOrdering;
     @isValidResources
     private Resources resources;
+    private boolean autoAck;
+    @isPositiveNumber
+    private Long timeoutMs;
 
     @isFileExists
     private String archive;
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 5b1fb41d8b..42f6c9c181 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1014,6 +1014,8 @@ Options
 |`--sink-type`|The built-in sinks's connector provider||
 |`--topics-pattern`|TopicsPattern to consume from list of topics under a 
namespace that match the pattern.||
 |`--tenant`|The sink’s tenant||
+|`--auto-ack`|Let the functions framework manage acking||
+|`--timeout-ms`|The message timeout in milliseconds||
 
 
 ### `update`
@@ -1091,6 +1093,8 @@ Options
 |`--sink-type`|The built-in sinks's connector provider||
 |`--topics-pattern`|TopicsPattern to consume from list of topics under a 
namespace that match the pattern.||
 |`--tenant`|The sink’s tenant||
+|`--auto-ack`|Let the functions framework manage acking||
+|`--timeout-ms`|The message timeout in milliseconds||
 
 
 ### `available-sinks`


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to