This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7565ece allow users to specify message timeout for functions (#1885) 7565ece is described below commit 7565ece49424c2efaa1c180564b414c51a832a81 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Fri Jun 1 13:38:56 2018 -0700 allow users to specify message timeout for functions (#1885) * allow users to specify message timeout * adding for python functions * adding validation * fixing bug in python * adding missing license header --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 11 +++++- .../functions/instance/JavaInstanceRunnable.java | 4 ++ .../pulsar/functions/source/PulsarSource.java | 12 ++++-- .../functions/source/PulsarSourceConfig.java | 1 + .../instance/src/main/python/Function_pb2.py | 43 +++++++++++++--------- .../instance/src/main/python/python_instance.py | 4 +- .../src/main/python/python_instance_main.py | 3 ++ .../proto/src/main/proto/Function.proto | 1 + .../pulsar/functions/runtime/JavaInstanceMain.java | 6 +++ .../pulsar/functions/runtime/ProcessRuntime.java | 5 +++ .../pulsar/functions/utils/FunctionConfig.java | 2 + .../functions/utils/validation/ValidatorImpls.java | 8 +++- 12 files changed, 75 insertions(+), 25 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index b3e4f6b..8590dd5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -248,8 +248,10 @@ public class CmdFunctions extends CmdBase { protected Integer slidingIntervalCount; @Parameter(names = "--slidingIntervalDurationMs", description = "The time duration after which the window slides") protected Long slidingIntervalDurationMs; - @Parameter(names = "--autoAck", description = "") + @Parameter(names = "--autoAck", description = "Whether or not the framework will automatically acknowleges messages") protected Boolean autoAck; + @Parameter(names = "--timeoutMs", description = "The message timeout in milliseconds") + protected Long timeoutMs; protected FunctionConfig functionConfig; protected String userCodeFile; @@ -323,6 +325,10 @@ public class CmdFunctions extends CmdBase { functionConfig.setResources(new org.apache.pulsar.functions.utils.Resources(cpu, ram, disk)); + if (timeoutMs != null) { + functionConfig.setTimeoutMs(timeoutMs); + } + // window configs WindowConfig windowConfig = functionConfig.getWindowConfig(); if (null != windowLengthCount) { @@ -507,6 +513,9 @@ public class CmdFunctions extends CmdBase { if (typeArgs != null) { sourceSpecBuilder.setTypeClassName(typeArgs[0].getName()); } + if (functionConfig.getTimeoutMs() != null) { + sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs()); + } functionDetailsBuilder.setSource(sourceSpecBuilder); // Setup sink diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 961f28e..6bb689c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -482,6 +482,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName()); + if (sourceSpec.getTimeoutMs() > 0 ) { + pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs()); + } + Object[] params = {this.client, pulsarSourceConfig}; Class[] paramTypes = {PulsarClient.class, PulsarSourceConfig.class}; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index c27bda8..f3a9805 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; @@ -60,12 +61,15 @@ public class PulsarSource<T> implements Source<T> { setupSerDe(); // Setup pulsar consumer - this.inputConsumer = this.pulsarClient.newConsumer() + ConsumerBuilder<byte[]> consumerBuilder = this.pulsarClient.newConsumer() .topics(new ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet())) .subscriptionName(this.pulsarSourceConfig.getSubscriptionName()) - .subscriptionType(this.pulsarSourceConfig.getSubscriptionType()) - .ackTimeout(1, TimeUnit.MINUTES) - .subscribe(); + .subscriptionType(this.pulsarSourceConfig.getSubscriptionType()); + + if (pulsarSourceConfig.getTimeoutMs() != null) { + consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); + } + this.inputConsumer = consumerBuilder.subscribe(); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java index 95c1001..6c45486 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java @@ -39,6 +39,7 @@ public class PulsarSourceConfig { private String subscriptionName; private Map<String, String> topicSerdeClassNameMap; private String typeClassName; + private Long timeoutMs; public static PulsarSourceConfig load(Map<String, Object> map) throws IOException { ObjectMapper mapper = new ObjectMapper(); diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index e6c8cd8..c9513c5 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='Function.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"\x95\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\ [...] + serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"\x95\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\ [...] ) _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( @@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1225, - serialized_end=1304, + serialized_start=1244, + serialized_end=1323, ) _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES) @@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1306, - serialized_end=1350, + serialized_start=1325, + serialized_end=1369, ) _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE) @@ -316,8 +316,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=690, - serialized_end=751, + serialized_start=709, + serialized_end=770, ) _SOURCESPEC = _descriptor.Descriptor( @@ -362,6 +362,13 @@ _SOURCESPEC = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='timeoutMs', full_name='proto.SourceSpec.timeoutMs', index=5, + number=6, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -375,7 +382,7 @@ _SOURCESPEC = _descriptor.Descriptor( oneofs=[ ], serialized_start=487, - serialized_end=751, + serialized_end=770, ) @@ -433,8 +440,8 @@ _SINKSPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=753, - serialized_end=861, + serialized_start=772, + serialized_end=880, ) @@ -464,8 +471,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=863, - serialized_end=909, + serialized_start=882, + serialized_end=928, ) @@ -516,8 +523,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=912, - serialized_end=1073, + serialized_start=931, + serialized_end=1092, ) @@ -554,8 +561,8 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1075, - serialized_end=1156, + serialized_start=1094, + serialized_end=1175, ) @@ -592,8 +599,8 @@ _ASSIGNMENT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1158, - serialized_end=1223, + serialized_start=1177, + serialized_end=1242, ) _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 7d6dc65..7120249 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -120,6 +120,7 @@ class PythonInstance(object): self.total_stats = Stats() self.current_stats = Stats() self.last_health_check_ts = time.time() + self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None def health_check(self): self.last_health_check_ts = time.time() @@ -154,7 +155,8 @@ class PythonInstance(object): self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, - message_listener=partial(self.message_listener, topic, self.input_serdes[topic]) + message_listener=partial(self.message_listener, topic, self.input_serdes[topic]), + unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None ) function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className) diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 0c9c5bf..ca490f8 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -72,6 +72,7 @@ def main(): parser.add_argument('--log_topic', required=False, help='Topic to send Log Messages') parser.add_argument('--source_subscription_type', required=True, help='Subscription Type') parser.add_argument('--source_topics_serde_classname', required=True, help='A mapping of Input topics to SerDe') + parser.add_argument('--source_timeout_ms', required=False, help='Source message timeout in milliseconds') parser.add_argument('--sink_topic', required=False, help='Sink Topic') parser.add_argument('--sink_serde_classname', required=False, help='Sink SerDe classname') @@ -101,6 +102,8 @@ def main(): log.critical("source_topics_serde_classname cannot be empty") for topics, serde_classname in source_topics_serde_classname_dict.items(): sourceSpec.topicsToSerDeClassName[topics] = serde_classname + if args.source_timeout_ms: + sourceSpec.timeoutMs = long(args.source_timeout_ms) function_details.source.MergeFrom(sourceSpec) sinkSpec = Function_pb2.SinkSpec() diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 07238eb..f9865c0 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -69,6 +69,7 @@ message SourceSpec { // configs used only when source feeds into functions SubscriptionType subscriptionType = 3; map<string,string> topicsToSerDeClassName = 4; + uint64 timeoutMs = 6; } message SinkSpec { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index c01f6bb..9a17567 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -111,6 +111,9 @@ public class JavaInstanceMain implements AutoCloseable { @Parameter(names = "--source_topics_serde_classname", description = "A map of topics to SerDe for the source") protected String sourceTopicsSerdeClassName; + @Parameter(names = "--source_timeout_ms", description = "Source message timeout in milliseconds") + protected Long sourceTimeoutMs; + @Parameter(names = "--sink_type_classname", description = "The injest type of the sink", required = true) protected String sinkTypeClassName; @@ -170,6 +173,9 @@ public class JavaInstanceMain implements AutoCloseable { sourceDetailsBuilder.setSubscriptionType(Function.SubscriptionType.valueOf(sourceSubscriptionType)); sourceDetailsBuilder.putAllTopicsToSerDeClassName(new Gson().fromJson(sourceTopicsSerdeClassName, Map.class)); sourceDetailsBuilder.setTypeClassName(sourceTypeClassName); + if (sourceTimeoutMs != null) { + sourceDetailsBuilder.setTimeoutMs(sourceTimeoutMs); + } functionDetailsBuilder.setSource(sourceDetailsBuilder); // Setup sink diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index c5d0109..82b2d3b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -160,6 +160,11 @@ class ProcessRuntime implements Runtime { args.add(instanceConfig.getFunctionDetails().getSource().getTypeClassName()); } } + + if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() > 0) { + args.add("--source_timeout_ms"); + args.add(String.valueOf(instanceConfig.getFunctionDetails().getSource().getTimeoutMs())); + } args.add("--source_subscription_type"); args.add(instanceConfig.getFunctionDetails().getSource().getSubscriptionType().toString()); args.add("--source_topics_serde_classname"); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index 742f530..be40e01 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -93,4 +93,6 @@ public class FunctionConfig { private String fqfn; @isValidWindowConfig private WindowConfig windowConfig; + @isPositiveNumber + private Long timeoutMs; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index bc86c2d..5bebf19 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -337,7 +337,6 @@ public class ValidatorImpls { // implements SerDe class functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> { - Class<?> serdeClass; try { serdeClass = loadClass(inputSerializer); @@ -459,6 +458,13 @@ public class ValidatorImpls { } functionConfig.setAutoAck(false); } + + if (functionConfig.getTimeoutMs() != null + && functionConfig.getProcessingGuarantees() != null + && functionConfig.getProcessingGuarantees() != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) { + throw new IllegalArgumentException("Message timeout can only be specifed with processing guarantee is " + + FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name()); + } } @Override -- To stop receiving notification emails like this one, please contact jerryp...@apache.org.