rdhabalia closed pull request #2503: add auto ack and timeout configurable URL: https://github.com/apache/incubator-pulsar/pull/2503
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 38a55bf47c..8f9eefea94 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -291,6 +291,10 @@ void runCmd() throws Exception { protected String DEPRECATED_sinkConfigString; @Parameter(names = "--sink-config", description = "User defined configs key/values") protected String sinkConfigString; + @Parameter(names = "--auto-ack", description = "Whether or not the framework will automatically acknowleges messages", arity = 1) + protected boolean autoAck = true; + @Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds") + protected Long timeoutMs; protected SinkConfig sinkConfig; @@ -399,6 +403,15 @@ void processArguments() throws Exception { sinkConfig.setConfigs(parseConfigs(sinkConfigString)); } + sinkConfig.setAutoAck(autoAck); + if (timeoutMs != null) { + sinkConfig.setTimeoutMs(timeoutMs); + } + + if (null != sinkConfigString) { + sinkConfig.setConfigs(parseConfigs(sinkConfigString)); + } + inferMissingArguments(sinkConfig); } @@ -585,7 +598,11 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep : SubscriptionType.SHARED; sourceSpecBuilder.setSubscriptionType(subType); - functionDetailsBuilder.setAutoAck(true); + functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck()); + if (sinkConfig.getTimeoutMs() != null) { + sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs()); + } + functionDetailsBuilder.setSource(sourceSpecBuilder); // set up sink spec diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index e3ba70e025..fb13d39913 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -133,6 +133,7 @@ public SinkConfig getSinkConfig() { sinkConfig.setTenant(TENANT); sinkConfig.setNamespace(NAMESPACE); sinkConfig.setName(NAME); + sinkConfig.setAutoAck(true); sinkConfig.setInputs(INPUTS_LIST); sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index be886c41ae..1132fa6b86 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -78,6 +78,9 @@ private boolean retainOrdering; @isValidResources private Resources resources; + private boolean autoAck; + @isPositiveNumber + private Long timeoutMs; @isFileExists private String archive; diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 5b1fb41d8b..42f6c9c181 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1014,6 +1014,8 @@ Options |`--sink-type`|The built-in sinks's connector provider|| |`--topics-pattern`|TopicsPattern to consume from list of topics under a namespace that match the pattern.|| |`--tenant`|The sink’s tenant|| +|`--auto-ack`|Let the functions framework manage acking|| +|`--timeout-ms`|The message timeout in milliseconds|| ### `update` @@ -1091,6 +1093,8 @@ Options |`--sink-type`|The built-in sinks's connector provider|| |`--topics-pattern`|TopicsPattern to consume from list of topics under a namespace that match the pattern.|| |`--tenant`|The sink’s tenant|| +|`--auto-ack`|Let the functions framework manage acking|| +|`--timeout-ms`|The message timeout in milliseconds|| ### `available-sinks` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services