This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7565ece   allow users to specify message timeout for functions (#1885)
7565ece is described below

commit 7565ece49424c2efaa1c180564b414c51a832a81
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Fri Jun 1 13:38:56 2018 -0700

     allow users to specify message timeout for functions (#1885)
    
    * allow users to specify message timeout
    
    * adding for python functions
    
    * adding validation
    
    * fixing bug in python
    
    * adding missing license header
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 11 +++++-
 .../functions/instance/JavaInstanceRunnable.java   |  4 ++
 .../pulsar/functions/source/PulsarSource.java      | 12 ++++--
 .../functions/source/PulsarSourceConfig.java       |  1 +
 .../instance/src/main/python/Function_pb2.py       | 43 +++++++++++++---------
 .../instance/src/main/python/python_instance.py    |  4 +-
 .../src/main/python/python_instance_main.py        |  3 ++
 .../proto/src/main/proto/Function.proto            |  1 +
 .../pulsar/functions/runtime/JavaInstanceMain.java |  6 +++
 .../pulsar/functions/runtime/ProcessRuntime.java   |  5 +++
 .../pulsar/functions/utils/FunctionConfig.java     |  2 +
 .../functions/utils/validation/ValidatorImpls.java |  8 +++-
 12 files changed, 75 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index b3e4f6b..8590dd5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -248,8 +248,10 @@ public class CmdFunctions extends CmdBase {
         protected Integer slidingIntervalCount;
         @Parameter(names = "--slidingIntervalDurationMs", description = "The 
time duration after which the window slides")
         protected Long slidingIntervalDurationMs;
-        @Parameter(names = "--autoAck", description = "")
+        @Parameter(names = "--autoAck", description = "Whether or not the 
framework will automatically acknowleges messages")
         protected Boolean autoAck;
+        @Parameter(names = "--timeoutMs", description = "The message timeout 
in milliseconds")
+        protected Long timeoutMs;
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
 
@@ -323,6 +325,10 @@ public class CmdFunctions extends CmdBase {
 
             functionConfig.setResources(new 
org.apache.pulsar.functions.utils.Resources(cpu, ram, disk));
 
+            if (timeoutMs != null) {
+                functionConfig.setTimeoutMs(timeoutMs);
+            }
+
             // window configs
             WindowConfig windowConfig = functionConfig.getWindowConfig();
             if (null != windowLengthCount) {
@@ -507,6 +513,9 @@ public class CmdFunctions extends CmdBase {
             if (typeArgs != null) {
                 sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
             }
+            if (functionConfig.getTimeoutMs() != null) {
+                sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
+            }
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
             // Setup sink
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 961f28e..6bb689c 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -482,6 +482,10 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
 
             pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName());
 
+            if (sourceSpec.getTimeoutMs() > 0 ) {
+                pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
+            }
+
             Object[] params = {this.client, pulsarSourceConfig};
             Class[] paramTypes = {PulsarClient.class, 
PulsarSourceConfig.class};
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index c27bda8..f3a9805 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
@@ -60,12 +61,15 @@ public class PulsarSource<T> implements Source<T> {
         setupSerDe();
 
         // Setup pulsar consumer
-        this.inputConsumer = this.pulsarClient.newConsumer()
+        ConsumerBuilder<byte[]> consumerBuilder = 
this.pulsarClient.newConsumer()
                 .topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()))
                 
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
-                
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType())
-                .ackTimeout(1, TimeUnit.MINUTES)
-                .subscribe();
+                
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
+
+        if (pulsarSourceConfig.getTimeoutMs() != null) {
+            consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        }
+        this.inputConsumer = consumerBuilder.subscribe();
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 95c1001..6c45486 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -39,6 +39,7 @@ public class PulsarSourceConfig {
     private String subscriptionName;
     private Map<String, String> topicSerdeClassNameMap;
     private String typeClassName;
+    private Long timeoutMs;
 
     public static PulsarSourceConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index e6c8cd8..c9513c5 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"\x95\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 
\x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 
\x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\ [...]
+  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"\x95\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 
\x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 
\x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\ [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1225,
-  serialized_end=1304,
+  serialized_start=1244,
+  serialized_end=1323,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1306,
-  serialized_end=1350,
+  serialized_start=1325,
+  serialized_end=1369,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -316,8 +316,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=690,
-  serialized_end=751,
+  serialized_start=709,
+  serialized_end=770,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -362,6 +362,13 @@ _SOURCESPEC = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='timeoutMs', full_name='proto.SourceSpec.timeoutMs', index=5,
+      number=6, type=4, cpp_type=4, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -375,7 +382,7 @@ _SOURCESPEC = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=487,
-  serialized_end=751,
+  serialized_end=770,
 )
 
 
@@ -433,8 +440,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=753,
-  serialized_end=861,
+  serialized_start=772,
+  serialized_end=880,
 )
 
 
@@ -464,8 +471,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=863,
-  serialized_end=909,
+  serialized_start=882,
+  serialized_end=928,
 )
 
 
@@ -516,8 +523,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=912,
-  serialized_end=1073,
+  serialized_start=931,
+  serialized_end=1092,
 )
 
 
@@ -554,8 +561,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1075,
-  serialized_end=1156,
+  serialized_start=1094,
+  serialized_end=1175,
 )
 
 
@@ -592,8 +599,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1158,
-  serialized_end=1223,
+  serialized_start=1177,
+  serialized_end=1242,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = 
_PROCESSINGGUARANTEES
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 7d6dc65..7120249 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -120,6 +120,7 @@ class PythonInstance(object):
     self.total_stats = Stats()
     self.current_stats = Stats()
     self.last_health_check_ts = time.time()
+    self.timeout_ms = function_details.source.timeoutMs if 
function_details.source.timeoutMs > 0 else None
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -154,7 +155,8 @@ class PythonInstance(object):
       self.consumers[topic] = self.pulsar_client.subscribe(
         str(topic), subscription_name,
         consumer_type=mode,
-        message_listener=partial(self.message_listener, topic, 
self.input_serdes[topic])
+        message_listener=partial(self.message_listener, topic, 
self.input_serdes[topic]),
+        unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms 
else None
       )
 
     function_kclass = util.import_class(os.path.dirname(self.user_code), 
self.instance_config.function_details.className)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 0c9c5bf..ca490f8 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -72,6 +72,7 @@ def main():
   parser.add_argument('--log_topic', required=False, help='Topic to send Log 
Messages')
   parser.add_argument('--source_subscription_type', required=True, 
help='Subscription Type')
   parser.add_argument('--source_topics_serde_classname', required=True, 
help='A mapping of Input topics to SerDe')
+  parser.add_argument('--source_timeout_ms', required=False, help='Source 
message timeout in milliseconds')
   parser.add_argument('--sink_topic', required=False, help='Sink Topic')
   parser.add_argument('--sink_serde_classname', required=False, help='Sink 
SerDe classname')
 
@@ -101,6 +102,8 @@ def main():
     log.critical("source_topics_serde_classname cannot be empty")
   for topics, serde_classname in source_topics_serde_classname_dict.items():
     sourceSpec.topicsToSerDeClassName[topics] = serde_classname
+  if args.source_timeout_ms:
+    sourceSpec.timeoutMs = long(args.source_timeout_ms)
   function_details.source.MergeFrom(sourceSpec)
 
   sinkSpec = Function_pb2.SinkSpec()
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 07238eb..f9865c0 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -69,6 +69,7 @@ message SourceSpec {
     // configs used only when source feeds into functions
     SubscriptionType subscriptionType = 3;
     map<string,string> topicsToSerDeClassName = 4;
+    uint64 timeoutMs = 6;
 }
 
 message SinkSpec {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index c01f6bb..9a17567 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -111,6 +111,9 @@ public class JavaInstanceMain implements AutoCloseable {
     @Parameter(names = "--source_topics_serde_classname", description = "A map 
of topics to SerDe for the source")
     protected String sourceTopicsSerdeClassName;
 
+    @Parameter(names = "--source_timeout_ms", description = "Source message 
timeout in milliseconds")
+    protected Long sourceTimeoutMs;
+
     @Parameter(names = "--sink_type_classname", description = "The injest type 
of the sink", required = true)
     protected String sinkTypeClassName;
 
@@ -170,6 +173,9 @@ public class JavaInstanceMain implements AutoCloseable {
         
sourceDetailsBuilder.setSubscriptionType(Function.SubscriptionType.valueOf(sourceSubscriptionType));
         sourceDetailsBuilder.putAllTopicsToSerDeClassName(new 
Gson().fromJson(sourceTopicsSerdeClassName, Map.class));
         sourceDetailsBuilder.setTypeClassName(sourceTypeClassName);
+        if (sourceTimeoutMs != null) {
+            sourceDetailsBuilder.setTimeoutMs(sourceTimeoutMs);
+        }
         functionDetailsBuilder.setSource(sourceDetailsBuilder);
 
         // Setup sink
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c5d0109..82b2d3b 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -160,6 +160,11 @@ class ProcessRuntime implements Runtime {
                 
args.add(instanceConfig.getFunctionDetails().getSource().getTypeClassName());
             }
         }
+
+        if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() > 
0) {
+            args.add("--source_timeout_ms");
+            
args.add(String.valueOf(instanceConfig.getFunctionDetails().getSource().getTimeoutMs()));
+        }
         args.add("--source_subscription_type");
         
args.add(instanceConfig.getFunctionDetails().getSource().getSubscriptionType().toString());
         args.add("--source_topics_serde_classname");
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 742f530..be40e01 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -93,4 +93,6 @@ public class FunctionConfig {
     private String fqfn;
     @isValidWindowConfig
     private WindowConfig windowConfig;
+    @isPositiveNumber
+    private Long timeoutMs;
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index bc86c2d..5bebf19 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -337,7 +337,6 @@ public class ValidatorImpls {
             // implements SerDe class
             functionConfig.getCustomSerdeInputs().forEach((topicName, 
inputSerializer) -> {
 
-
                 Class<?> serdeClass;
                 try {
                     serdeClass = loadClass(inputSerializer);
@@ -459,6 +458,13 @@ public class ValidatorImpls {
                 }
                 functionConfig.setAutoAck(false);
             }
+
+            if (functionConfig.getTimeoutMs() != null
+                    && functionConfig.getProcessingGuarantees() != null
+                    && functionConfig.getProcessingGuarantees() != 
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
+                throw new IllegalArgumentException("Message timeout can only 
be specifed with processing guarantee is "
+                        + 
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
+            }
         }
 
         @Override

-- 
To stop receiving notification emails like this one, please contact
jerryp...@apache.org.

Reply via email to