This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b24ab9  Log Topic for Functions (#1356)
6b24ab9 is described below

commit 6b24ab9cbb7a4d471c05c7244b84243936cba4f6
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Wed Mar 7 13:02:33 2018 -0800

    Log Topic for Functions (#1356)
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   5 +
 .../instance/src/main/python/Function_pb2.py       | 153 +++++++++++++++------
 pulsar-functions/instance/src/main/python/log.py   |   4 +-
 .../proto/src/main/proto/Function.proto            |   1 +
 pulsar-functions/python-examples/logfunction.py    |  30 ++++
 .../pulsar/functions/runtime/JavaInstanceMain.java |   6 +
 .../pulsar/functions/runtime/ProcessRuntime.java   |   5 +
 .../functions/runtime/ProcessRuntimeTest.java      |   7 +-
 8 files changed, 164 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 553f1c6..c8010e7 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -143,6 +143,8 @@ public class CmdFunctions extends CmdBase {
         protected String inputs;
         @Parameter(names = "--output", description = "Output Topic Name")
         protected String output;
+        @Parameter(names = "--logTopic", description = "Log Topic")
+        protected String logTopic;
         @Parameter(names = "--customSerdeInputs", description = "Map of input 
topic to serde classname")
         protected String customSerdeInputString;
         @Parameter(names = "--outputSerdeClassName", description = "Output 
SerDe")
@@ -184,6 +186,9 @@ public class CmdFunctions extends CmdBase {
             if (null != output) {
                 functionConfigBuilder.setOutput(output);
             }
+            if (null != logTopic) {
+                functionConfigBuilder.setLogTopic(logTopic);
+            }
             if (null != tenant) {
                 functionConfigBuilder.setTenant(tenant);
             }
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 8566806..95fcd7a 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -16,8 +16,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-
+# 
+  
 # -*- encoding: utf-8 -*-
 
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
@@ -38,10 +38,10 @@ _sym_db = _symbol_database.Default()
 
 
 DESCRIPTOR = _descriptor.FileDescriptor(
-  name='Function.proto',
+  name='pulsar-functions/proto/src/main/proto/Function.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xdb\x05\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x0e\n\x06inputs\x18\x0e 
\x03(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 
\x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06
 \x01(\t\x12\x0e\n\x06output\x18\x07 
\x01(\t\x12H\n\x14processingGuarantees\x18\t \ [...]
+  
serialized_pb=_b('\n4pulsar-functions/proto/src/main/proto/Function.proto\x12\x05proto\"\x98\x06\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 
\x01(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 
\x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06
 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 
\x01(\t\x12H [...]
 )
 
 
@@ -53,18 +53,22 @@ _FUNCTIONCONFIG_PROCESSINGGUARANTEES = 
_descriptor.EnumDescriptor(
   file=DESCRIPTOR,
   values=[
     _descriptor.EnumValueDescriptor(
-      name='ATMOST_ONCE', index=0, number=0,
+      name='ATLEAST_ONCE', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='ATMOST_ONCE', index=1, number=1,
       options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
-      name='ATLEAST_ONCE', index=1, number=1,
+      name='EFFECTIVELY_ONCE', index=2, number=2,
       options=None,
       type=None),
   ],
   containing_type=None,
   options=None,
-  serialized_start=620,
-  serialized_end=677,
+  serialized_start=697,
+  serialized_end=776,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_PROCESSINGGUARANTEES)
 
@@ -85,8 +89,8 @@ _FUNCTIONCONFIG_SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=679,
-  serialized_end=724,
+  serialized_start=778,
+  serialized_end=823,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_SUBSCRIPTIONTYPE)
 
@@ -107,8 +111,8 @@ _FUNCTIONCONFIG_RUNTIME = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=726,
-  serialized_end=757,
+  serialized_start=825,
+  serialized_end=856,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_RUNTIME)
 
@@ -146,8 +150,8 @@ _FUNCTIONCONFIG_CUSTOMSERDEINPUTSENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=511,
-  serialized_end=567,
+  serialized_start=588,
+  serialized_end=644,
 )
 
 _FUNCTIONCONFIG_USERCONFIGENTRY = _descriptor.Descriptor(
@@ -183,8 +187,8 @@ _FUNCTIONCONFIG_USERCONFIGENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=569,
-  serialized_end=618,
+  serialized_start=646,
+  serialized_end=695,
 )
 
 _FUNCTIONCONFIG = _descriptor.Descriptor(
@@ -223,29 +227,29 @@ _FUNCTIONCONFIG = _descriptor.Descriptor(
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='inputs', full_name='proto.FunctionConfig.inputs', index=4,
-      number=14, type=9, cpp_type=9, label=3,
+      name='customSerdeInputs', 
full_name='proto.FunctionConfig.customSerdeInputs', index=4,
+      number=5, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='customSerdeInputs', 
full_name='proto.FunctionConfig.customSerdeInputs', index=5,
-      number=5, type=11, cpp_type=10, label=3,
-      has_default_value=False, default_value=[],
+      name='outputSerdeClassName', 
full_name='proto.FunctionConfig.outputSerdeClassName', index=5,
+      number=6, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='outputSerdeClassName', 
full_name='proto.FunctionConfig.outputSerdeClassName', index=6,
-      number=6, type=9, cpp_type=9, label=1,
+      name='output', full_name='proto.FunctionConfig.output', index=6,
+      number=7, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='output', full_name='proto.FunctionConfig.output', index=7,
-      number=7, type=9, cpp_type=9, label=1,
+      name='logTopic', full_name='proto.FunctionConfig.logTopic', index=7,
+      number=8, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -285,6 +289,20 @@ _FUNCTIONCONFIG = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='inputs', full_name='proto.FunctionConfig.inputs', index=13,
+      number=14, type=9, cpp_type=9, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='parallelism', full_name='proto.FunctionConfig.parallelism', 
index=14,
+      number=15, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -300,8 +318,8 @@ _FUNCTIONCONFIG = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=26,
-  serialized_end=757,
+  serialized_start=64,
+  serialized_end=856,
 )
 
 
@@ -331,8 +349,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=759,
-  serialized_end=805,
+  serialized_start=858,
+  serialized_end=904,
 )
 
 
@@ -383,8 +401,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=808,
-  serialized_end=967,
+  serialized_start=907,
+  serialized_end=1066,
 )
 
 
@@ -421,8 +439,46 @@ _SNAPSHOT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=969,
-  serialized_end=1064,
+  serialized_start=1068,
+  serialized_end=1163,
+)
+
+
+_INSTANCE = _descriptor.Descriptor(
+  name='Instance',
+  full_name='proto.Instance',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='functionMetaData', full_name='proto.Instance.functionMetaData', 
index=0,
+      number=1, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='instanceId', full_name='proto.Instance.instanceId', index=1,
+      number=2, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1165,
+  serialized_end=1246,
 )
 
 
@@ -434,7 +490,7 @@ _ASSIGNMENT = _descriptor.Descriptor(
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='functionMetaData', full_name='proto.Assignment.functionMetaData', 
index=0,
+      name='instance', full_name='proto.Assignment.instance', index=0,
       number=1, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
@@ -459,8 +515,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1066,
-  serialized_end=1147,
+  serialized_start=1248,
+  serialized_end=1313,
 )
 
 _FUNCTIONCONFIG_CUSTOMSERDEINPUTSENTRY.containing_type = _FUNCTIONCONFIG
@@ -476,11 +532,13 @@ _FUNCTIONCONFIG_RUNTIME.containing_type = _FUNCTIONCONFIG
 _FUNCTIONMETADATA.fields_by_name['functionConfig'].message_type = 
_FUNCTIONCONFIG
 _FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = 
_PACKAGELOCATIONMETADATA
 _SNAPSHOT.fields_by_name['functionMetaDataList'].message_type = 
_FUNCTIONMETADATA
-_ASSIGNMENT.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
+_INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
+_ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE
 DESCRIPTOR.message_types_by_name['FunctionConfig'] = _FUNCTIONCONFIG
 DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = 
_PACKAGELOCATIONMETADATA
 DESCRIPTOR.message_types_by_name['FunctionMetaData'] = _FUNCTIONMETADATA
 DESCRIPTOR.message_types_by_name['Snapshot'] = _SNAPSHOT
+DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
 DESCRIPTOR.message_types_by_name['Assignment'] = _ASSIGNMENT
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
 
@@ -488,19 +546,19 @@ FunctionConfig = 
_reflection.GeneratedProtocolMessageType('FunctionConfig', (_me
 
   CustomSerdeInputsEntry = 
_reflection.GeneratedProtocolMessageType('CustomSerdeInputsEntry', 
(_message.Message,), dict(
     DESCRIPTOR = _FUNCTIONCONFIG_CUSTOMSERDEINPUTSENTRY,
-    __module__ = 'Function_pb2'
+    __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
     # 
@@protoc_insertion_point(class_scope:proto.FunctionConfig.CustomSerdeInputsEntry)
     ))
   ,
 
   UserConfigEntry = 
_reflection.GeneratedProtocolMessageType('UserConfigEntry', 
(_message.Message,), dict(
     DESCRIPTOR = _FUNCTIONCONFIG_USERCONFIGENTRY,
-    __module__ = 'Function_pb2'
+    __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
     # 
@@protoc_insertion_point(class_scope:proto.FunctionConfig.UserConfigEntry)
     ))
   ,
   DESCRIPTOR = _FUNCTIONCONFIG,
-  __module__ = 'Function_pb2'
+  __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.FunctionConfig)
   ))
 _sym_db.RegisterMessage(FunctionConfig)
@@ -509,28 +567,35 @@ _sym_db.RegisterMessage(FunctionConfig.UserConfigEntry)
 
 PackageLocationMetaData = 
_reflection.GeneratedProtocolMessageType('PackageLocationMetaData', 
(_message.Message,), dict(
   DESCRIPTOR = _PACKAGELOCATIONMETADATA,
-  __module__ = 'Function_pb2'
+  __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.PackageLocationMetaData)
   ))
 _sym_db.RegisterMessage(PackageLocationMetaData)
 
 FunctionMetaData = 
_reflection.GeneratedProtocolMessageType('FunctionMetaData', 
(_message.Message,), dict(
   DESCRIPTOR = _FUNCTIONMETADATA,
-  __module__ = 'Function_pb2'
+  __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.FunctionMetaData)
   ))
 _sym_db.RegisterMessage(FunctionMetaData)
 
 Snapshot = _reflection.GeneratedProtocolMessageType('Snapshot', 
(_message.Message,), dict(
   DESCRIPTOR = _SNAPSHOT,
-  __module__ = 'Function_pb2'
+  __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.Snapshot)
   ))
 _sym_db.RegisterMessage(Snapshot)
 
+Instance = _reflection.GeneratedProtocolMessageType('Instance', 
(_message.Message,), dict(
+  DESCRIPTOR = _INSTANCE,
+  __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
+  # @@protoc_insertion_point(class_scope:proto.Instance)
+  ))
+_sym_db.RegisterMessage(Instance)
+
 Assignment = _reflection.GeneratedProtocolMessageType('Assignment', 
(_message.Message,), dict(
   DESCRIPTOR = _ASSIGNMENT,
-  __module__ = 'Function_pb2'
+  __module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.Assignment)
   ))
 _sym_db.RegisterMessage(Assignment)
diff --git a/pulsar-functions/instance/src/main/python/log.py 
b/pulsar-functions/instance/src/main/python/log.py
index 85b2104..18bde78 100644
--- a/pulsar-functions/instance/src/main/python/log.py
+++ b/pulsar-functions/instance/src/main/python/log.py
@@ -37,6 +37,7 @@ date_format = "%Y-%m-%d %H:%M:%S %z"
 
 class LogTopicHandler(logging.Handler):
   def __init__(self, topic_name, pulsar_client):
+    logging.Handler.__init__(self)
     Log.info("Setting up producer for log topic %s" % topic_name)
     self.producer = pulsar_client.create_producer(
       str(topic_name),
@@ -46,7 +47,8 @@ class LogTopicHandler(logging.Handler):
       compression_type=pulsar._pulsar.CompressionType.LZ4)
 
   def emit(self, record):
-    self.producer.send_async(record)
+    msg = self.format(record)
+    self.producer.send_async(str(msg), None)
 
 def configure(level=logging.INFO):
   """ Configure logger which dumps log on terminal
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 2beab12..4b74dc7 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -45,6 +45,7 @@ message FunctionConfig {
     map<string, string> customSerdeInputs = 5;
     string outputSerdeClassName = 6;
     string output = 7;
+    string logTopic = 8;
     ProcessingGuarantees processingGuarantees = 9;
     map<string,string> userConfig = 10;
     SubscriptionType subscriptionType = 11;
diff --git a/pulsar-functions/python-examples/logfunction.py 
b/pulsar-functions/python-examples/logfunction.py
new file mode 100755
index 0000000..4c59124
--- /dev/null
+++ b/pulsar-functions/python-examples/logfunction.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from pulsarfunction import pulsar_function
+
+class LogFunction(pulsar_function.PulsarFunction):
+  def __init__(self):
+    pass
+
+  def process(self, input, context):
+    context.get_logger().info(input)
+    return input + '!'
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index dd52efd..badee94 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -69,6 +69,9 @@ public class JavaInstanceMain {
     @Parameter(names = "--output_serde_classname", description = "Output 
SerDe\n")
     protected String outputSerdeClassName;
 
+    @Parameter(names = "--log_topic", description = "Log Topic")
+    protected String logTopic;
+
     @Parameter(names = "--processing_guarantees", description = "Processing 
Guarantees\n", required = true)
     protected FunctionConfig.ProcessingGuarantees processingGuarantees;
 
@@ -137,6 +140,9 @@ public class JavaInstanceMain {
         if (sinkTopicName != null) {
             functionConfigBuilder.setOutput(sinkTopicName);
         }
+        if (logTopic != null) {
+            functionConfigBuilder.setLogTopic(logTopic);
+        }
         functionConfigBuilder.setProcessingGuarantees(processingGuarantees);
         if (autoAck.equals("true")) {
             functionConfigBuilder.setAutoAck(true);
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 3460c04..c1d7b7f 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -106,6 +106,11 @@ class ProcessRuntime implements Runtime {
         args.add(instanceConfig.getFunctionConfig().getName());
         args.add("--function_classname");
         args.add(instanceConfig.getFunctionConfig().getClassName());
+        if (instanceConfig.getFunctionConfig().getLogTopic() != null &&
+            !instanceConfig.getFunctionConfig().getLogTopic().isEmpty()) {
+            args.add("--log_topic");
+            args.add(instanceConfig.getFunctionConfig().getLogTopic());
+        }
         if (instanceConfig.getFunctionConfig().getCustomSerdeInputsCount() > 
0) {
             String sourceTopicString = "";
             String inputSerdeClassNameString = "";
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 5b149af..31e6422 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -79,6 +79,7 @@ public class ProcessRuntimeTest {
         functionConfigBuilder.addInputs(TEST_NAME + "-source2");
         functionConfigBuilder.setOutput(TEST_NAME + "-sink");
         
functionConfigBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer");
+        functionConfigBuilder.setLogTopic(TEST_NAME + "-log");
         return functionConfigBuilder.build();
     }
 
@@ -100,7 +101,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 39);
+        assertEquals(args.size(), 41);
         args.remove(args.size() - 1);
         String expectedArgs = "java -cp " + javaInstanceJarFile + " 
-Dlog4j.configurationFile=java_instance_log4j2.yml "
                 + "-Dpulsar.log.dir=" + logDirectory + " -Dpulsar.log.file=" + 
config.getFunctionConfig().getName()
@@ -111,6 +112,7 @@ public class ProcessRuntimeTest {
                 + " --namespace " + config.getFunctionConfig().getNamespace()
                 + " --name " + config.getFunctionConfig().getName()
                 + " --function_classname " + 
config.getFunctionConfig().getClassName()
+                + " --log_topic " + config.getFunctionConfig().getLogTopic()
                 + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + 
"-source2"
                 + " --auto_ack false"
                 + " --sink_topic " + config.getFunctionConfig().getOutput()
@@ -127,7 +129,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 38);
+        assertEquals(args.size(), 40);
         args.remove(args.size() - 1);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
@@ -137,6 +139,7 @@ public class ProcessRuntimeTest {
                 + " --namespace " + config.getFunctionConfig().getNamespace()
                 + " --name " + config.getFunctionConfig().getName()
                 + " --function_classname " + 
config.getFunctionConfig().getClassName()
+                + " --log_topic " + config.getFunctionConfig().getLogTopic()
                 + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + 
"-source2"
                 + " --auto_ack false"
                 + " --sink_topic " + config.getFunctionConfig().getOutput()

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to