This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6b24ab9 Log Topic for Functions (#1356) 6b24ab9 is described below commit 6b24ab9cbb7a4d471c05c7244b84243936cba4f6 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Wed Mar 7 13:02:33 2018 -0800 Log Topic for Functions (#1356) --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 5 + .../instance/src/main/python/Function_pb2.py | 153 +++++++++++++++------ pulsar-functions/instance/src/main/python/log.py | 4 +- .../proto/src/main/proto/Function.proto | 1 + pulsar-functions/python-examples/logfunction.py | 30 ++++ .../pulsar/functions/runtime/JavaInstanceMain.java | 6 + .../pulsar/functions/runtime/ProcessRuntime.java | 5 + .../functions/runtime/ProcessRuntimeTest.java | 7 +- 8 files changed, 164 insertions(+), 47 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 553f1c6..c8010e7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -143,6 +143,8 @@ public class CmdFunctions extends CmdBase { protected String inputs; @Parameter(names = "--output", description = "Output Topic Name") protected String output; + @Parameter(names = "--logTopic", description = "Log Topic") + protected String logTopic; @Parameter(names = "--customSerdeInputs", description = "Map of input topic to serde classname") protected String customSerdeInputString; @Parameter(names = "--outputSerdeClassName", description = "Output SerDe") @@ -184,6 +186,9 @@ public class CmdFunctions extends CmdBase { if (null != output) { functionConfigBuilder.setOutput(output); } + if (null != logTopic) { + functionConfigBuilder.setLogTopic(logTopic); + } if (null != tenant) { functionConfigBuilder.setTenant(tenant); } diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index 8566806..95fcd7a 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -16,8 +16,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# - +# + # -*- encoding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! @@ -38,10 +38,10 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( - name='Function.proto', + name='pulsar-functions/proto/src/main/proto/Function.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xdb\x05\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x0e\n\x06inputs\x18\x0e \x03(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 \x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12H\n\x14processingGuarantees\x18\t \ [...] + serialized_pb=_b('\n4pulsar-functions/proto/src/main/proto/Function.proto\x12\x05proto\"\x98\x06\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 \x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 \x01(\t\x12H [...] ) @@ -53,18 +53,22 @@ _FUNCTIONCONFIG_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( file=DESCRIPTOR, values=[ _descriptor.EnumValueDescriptor( - name='ATMOST_ONCE', index=0, number=0, + name='ATLEAST_ONCE', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='ATMOST_ONCE', index=1, number=1, options=None, type=None), _descriptor.EnumValueDescriptor( - name='ATLEAST_ONCE', index=1, number=1, + name='EFFECTIVELY_ONCE', index=2, number=2, options=None, type=None), ], containing_type=None, options=None, - serialized_start=620, - serialized_end=677, + serialized_start=697, + serialized_end=776, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_PROCESSINGGUARANTEES) @@ -85,8 +89,8 @@ _FUNCTIONCONFIG_SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=679, - serialized_end=724, + serialized_start=778, + serialized_end=823, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_SUBSCRIPTIONTYPE) @@ -107,8 +111,8 @@ _FUNCTIONCONFIG_RUNTIME = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=726, - serialized_end=757, + serialized_start=825, + serialized_end=856, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_RUNTIME) @@ -146,8 +150,8 @@ _FUNCTIONCONFIG_CUSTOMSERDEINPUTSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=511, - serialized_end=567, + serialized_start=588, + serialized_end=644, ) _FUNCTIONCONFIG_USERCONFIGENTRY = _descriptor.Descriptor( @@ -183,8 +187,8 @@ _FUNCTIONCONFIG_USERCONFIGENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=569, - serialized_end=618, + serialized_start=646, + serialized_end=695, ) _FUNCTIONCONFIG = _descriptor.Descriptor( @@ -223,29 +227,29 @@ _FUNCTIONCONFIG = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='inputs', full_name='proto.FunctionConfig.inputs', index=4, - number=14, type=9, cpp_type=9, label=3, + name='customSerdeInputs', full_name='proto.FunctionConfig.customSerdeInputs', index=4, + number=5, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='customSerdeInputs', full_name='proto.FunctionConfig.customSerdeInputs', index=5, - number=5, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], + name='outputSerdeClassName', full_name='proto.FunctionConfig.outputSerdeClassName', index=5, + number=6, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='outputSerdeClassName', full_name='proto.FunctionConfig.outputSerdeClassName', index=6, - number=6, type=9, cpp_type=9, label=1, + name='output', full_name='proto.FunctionConfig.output', index=6, + number=7, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='output', full_name='proto.FunctionConfig.output', index=7, - number=7, type=9, cpp_type=9, label=1, + name='logTopic', full_name='proto.FunctionConfig.logTopic', index=7, + number=8, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -285,6 +289,20 @@ _FUNCTIONCONFIG = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='inputs', full_name='proto.FunctionConfig.inputs', index=13, + number=14, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='parallelism', full_name='proto.FunctionConfig.parallelism', index=14, + number=15, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -300,8 +318,8 @@ _FUNCTIONCONFIG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=26, - serialized_end=757, + serialized_start=64, + serialized_end=856, ) @@ -331,8 +349,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=759, - serialized_end=805, + serialized_start=858, + serialized_end=904, ) @@ -383,8 +401,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=808, - serialized_end=967, + serialized_start=907, + serialized_end=1066, ) @@ -421,8 +439,46 @@ _SNAPSHOT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=969, - serialized_end=1064, + serialized_start=1068, + serialized_end=1163, +) + + +_INSTANCE = _descriptor.Descriptor( + name='Instance', + full_name='proto.Instance', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='functionMetaData', full_name='proto.Instance.functionMetaData', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='instanceId', full_name='proto.Instance.instanceId', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1165, + serialized_end=1246, ) @@ -434,7 +490,7 @@ _ASSIGNMENT = _descriptor.Descriptor( containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='functionMetaData', full_name='proto.Assignment.functionMetaData', index=0, + name='instance', full_name='proto.Assignment.instance', index=0, number=1, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, @@ -459,8 +515,8 @@ _ASSIGNMENT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1066, - serialized_end=1147, + serialized_start=1248, + serialized_end=1313, ) _FUNCTIONCONFIG_CUSTOMSERDEINPUTSENTRY.containing_type = _FUNCTIONCONFIG @@ -476,11 +532,13 @@ _FUNCTIONCONFIG_RUNTIME.containing_type = _FUNCTIONCONFIG _FUNCTIONMETADATA.fields_by_name['functionConfig'].message_type = _FUNCTIONCONFIG _FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = _PACKAGELOCATIONMETADATA _SNAPSHOT.fields_by_name['functionMetaDataList'].message_type = _FUNCTIONMETADATA -_ASSIGNMENT.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA +_INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA +_ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE DESCRIPTOR.message_types_by_name['FunctionConfig'] = _FUNCTIONCONFIG DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = _PACKAGELOCATIONMETADATA DESCRIPTOR.message_types_by_name['FunctionMetaData'] = _FUNCTIONMETADATA DESCRIPTOR.message_types_by_name['Snapshot'] = _SNAPSHOT +DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE DESCRIPTOR.message_types_by_name['Assignment'] = _ASSIGNMENT _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -488,19 +546,19 @@ FunctionConfig = _reflection.GeneratedProtocolMessageType('FunctionConfig', (_me CustomSerdeInputsEntry = _reflection.GeneratedProtocolMessageType('CustomSerdeInputsEntry', (_message.Message,), dict( DESCRIPTOR = _FUNCTIONCONFIG_CUSTOMSERDEINPUTSENTRY, - __module__ = 'Function_pb2' + __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2' # @@protoc_insertion_point(class_scope:proto.FunctionConfig.CustomSerdeInputsEntry) )) , UserConfigEntry = _reflection.GeneratedProtocolMessageType('UserConfigEntry', (_message.Message,), dict( DESCRIPTOR = _FUNCTIONCONFIG_USERCONFIGENTRY, - __module__ = 'Function_pb2' + __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2' # @@protoc_insertion_point(class_scope:proto.FunctionConfig.UserConfigEntry) )) , DESCRIPTOR = _FUNCTIONCONFIG, - __module__ = 'Function_pb2' + __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2' # @@protoc_insertion_point(class_scope:proto.FunctionConfig) )) _sym_db.RegisterMessage(FunctionConfig) @@ -509,28 +567,35 @@ _sym_db.RegisterMessage(FunctionConfig.UserConfigEntry) PackageLocationMetaData = _reflection.GeneratedProtocolMessageType('PackageLocationMetaData', (_message.Message,), dict( DESCRIPTOR = _PACKAGELOCATIONMETADATA, - __module__ = 'Function_pb2' + __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2' # @@protoc_insertion_point(class_scope:proto.PackageLocationMetaData) )) _sym_db.RegisterMessage(PackageLocationMetaData) FunctionMetaData = _reflection.GeneratedProtocolMessageType('FunctionMetaData', (_message.Message,), dict( DESCRIPTOR = _FUNCTIONMETADATA, - __module__ = 'Function_pb2' + __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2' # @@protoc_insertion_point(class_scope:proto.FunctionMetaData) )) _sym_db.RegisterMessage(FunctionMetaData) Snapshot = _reflection.GeneratedProtocolMessageType('Snapshot', (_message.Message,), dict( DESCRIPTOR = _SNAPSHOT, - __module__ = 'Function_pb2' + __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2' # @@protoc_insertion_point(class_scope:proto.Snapshot) )) _sym_db.RegisterMessage(Snapshot) +Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict( + DESCRIPTOR = _INSTANCE, + __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2' + # @@protoc_insertion_point(class_scope:proto.Instance) + )) +_sym_db.RegisterMessage(Instance) + Assignment = _reflection.GeneratedProtocolMessageType('Assignment', (_message.Message,), dict( DESCRIPTOR = _ASSIGNMENT, - __module__ = 'Function_pb2' + __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2' # @@protoc_insertion_point(class_scope:proto.Assignment) )) _sym_db.RegisterMessage(Assignment) diff --git a/pulsar-functions/instance/src/main/python/log.py b/pulsar-functions/instance/src/main/python/log.py index 85b2104..18bde78 100644 --- a/pulsar-functions/instance/src/main/python/log.py +++ b/pulsar-functions/instance/src/main/python/log.py @@ -37,6 +37,7 @@ date_format = "%Y-%m-%d %H:%M:%S %z" class LogTopicHandler(logging.Handler): def __init__(self, topic_name, pulsar_client): + logging.Handler.__init__(self) Log.info("Setting up producer for log topic %s" % topic_name) self.producer = pulsar_client.create_producer( str(topic_name), @@ -46,7 +47,8 @@ class LogTopicHandler(logging.Handler): compression_type=pulsar._pulsar.CompressionType.LZ4) def emit(self, record): - self.producer.send_async(record) + msg = self.format(record) + self.producer.send_async(str(msg), None) def configure(level=logging.INFO): """ Configure logger which dumps log on terminal diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 2beab12..4b74dc7 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -45,6 +45,7 @@ message FunctionConfig { map<string, string> customSerdeInputs = 5; string outputSerdeClassName = 6; string output = 7; + string logTopic = 8; ProcessingGuarantees processingGuarantees = 9; map<string,string> userConfig = 10; SubscriptionType subscriptionType = 11; diff --git a/pulsar-functions/python-examples/logfunction.py b/pulsar-functions/python-examples/logfunction.py new file mode 100755 index 0000000..4c59124 --- /dev/null +++ b/pulsar-functions/python-examples/logfunction.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +from pulsarfunction import pulsar_function + +class LogFunction(pulsar_function.PulsarFunction): + def __init__(self): + pass + + def process(self, input, context): + context.get_logger().info(input) + return input + '!' diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index dd52efd..badee94 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -69,6 +69,9 @@ public class JavaInstanceMain { @Parameter(names = "--output_serde_classname", description = "Output SerDe\n") protected String outputSerdeClassName; + @Parameter(names = "--log_topic", description = "Log Topic") + protected String logTopic; + @Parameter(names = "--processing_guarantees", description = "Processing Guarantees\n", required = true) protected FunctionConfig.ProcessingGuarantees processingGuarantees; @@ -137,6 +140,9 @@ public class JavaInstanceMain { if (sinkTopicName != null) { functionConfigBuilder.setOutput(sinkTopicName); } + if (logTopic != null) { + functionConfigBuilder.setLogTopic(logTopic); + } functionConfigBuilder.setProcessingGuarantees(processingGuarantees); if (autoAck.equals("true")) { functionConfigBuilder.setAutoAck(true); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 3460c04..c1d7b7f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -106,6 +106,11 @@ class ProcessRuntime implements Runtime { args.add(instanceConfig.getFunctionConfig().getName()); args.add("--function_classname"); args.add(instanceConfig.getFunctionConfig().getClassName()); + if (instanceConfig.getFunctionConfig().getLogTopic() != null && + !instanceConfig.getFunctionConfig().getLogTopic().isEmpty()) { + args.add("--log_topic"); + args.add(instanceConfig.getFunctionConfig().getLogTopic()); + } if (instanceConfig.getFunctionConfig().getCustomSerdeInputsCount() > 0) { String sourceTopicString = ""; String inputSerdeClassNameString = ""; diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 5b149af..31e6422 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -79,6 +79,7 @@ public class ProcessRuntimeTest { functionConfigBuilder.addInputs(TEST_NAME + "-source2"); functionConfigBuilder.setOutput(TEST_NAME + "-sink"); functionConfigBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer"); + functionConfigBuilder.setLogTopic(TEST_NAME + "-log"); return functionConfigBuilder.build(); } @@ -100,7 +101,7 @@ public class ProcessRuntimeTest { ProcessRuntime container = factory.createContainer(config, userJarFile); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 39); + assertEquals(args.size(), 41); args.remove(args.size() - 1); String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dlog4j.configurationFile=java_instance_log4j2.yml " + "-Dpulsar.log.dir=" + logDirectory + " -Dpulsar.log.file=" + config.getFunctionConfig().getName() @@ -111,6 +112,7 @@ public class ProcessRuntimeTest { + " --namespace " + config.getFunctionConfig().getNamespace() + " --name " + config.getFunctionConfig().getName() + " --function_classname " + config.getFunctionConfig().getClassName() + + " --log_topic " + config.getFunctionConfig().getLogTopic() + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + "-source2" + " --auto_ack false" + " --sink_topic " + config.getFunctionConfig().getOutput() @@ -127,7 +129,7 @@ public class ProcessRuntimeTest { ProcessRuntime container = factory.createContainer(config, userJarFile); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 38); + assertEquals(args.size(), 40); args.remove(args.size() - 1); String expectedArgs = "python " + pythonInstanceFile + " --py " + userJarFile + " --logging_directory " @@ -137,6 +139,7 @@ public class ProcessRuntimeTest { + " --namespace " + config.getFunctionConfig().getNamespace() + " --name " + config.getFunctionConfig().getName() + " --function_classname " + config.getFunctionConfig().getClassName() + + " --log_topic " + config.getFunctionConfig().getLogTopic() + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + "-source2" + " --auto_ack false" + " --sink_topic " + config.getFunctionConfig().getOutput() -- To stop receiving notification emails like this one, please contact si...@apache.org.