This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3f8bd00 fixing/improving logging for function instance (#2586) 3f8bd00 is described below commit 3f8bd00d961f1f56233c629746c11cbd711bf06e Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Tue Sep 18 23:30:39 2018 -0700 fixing/improving logging for function instance (#2586) ### Motivation logging for java function instance (process mode and localrun) is not working well. Log files are created at inappropriate locations for logs not captured by the routing appender, functions bookkeeper logs are missing because we are not using the shaded classpath. The best for logging is just to put all the logs from the java instance log in a single log and not worry about the routing which is only useful when running in threaded mode --- .../pulsar/functions/runtime/JavaInstanceMain.java | 9 --- .../pulsar/functions/runtime/ProcessRuntime.java | 22 +++--- .../pulsar/functions/runtime/ThreadRuntime.java | 4 +- .../src/main/resources/java_instance_log4j2.yml | 78 +++------------------- .../functions/runtime/ProcessRuntimeTest.java | 4 +- 5 files changed, 31 insertions(+), 86 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 11da7c3..083686b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -22,27 +22,18 @@ package org.apache.pulsar.functions.runtime; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.converters.StringConverter; -import com.google.gson.Gson; import com.google.protobuf.Empty; import com.google.protobuf.util.JsonFormat; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; - -import static org.apache.commons.lang3.StringUtils.isNotBlank; - import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; -import org.apache.pulsar.functions.proto.Function.SinkSpec; -import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceControlGrpc; -import java.util.Map; import java.util.TimerTask; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index a978a09..c5159b0 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -22,32 +22,32 @@ package org.apache.pulsar.functions.runtime; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.gson.Gson; import com.google.protobuf.Empty; import com.google.protobuf.util.JsonFormat; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; - -import static org.apache.commons.lang3.StringUtils.isNotBlank; - -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceControlGrpc; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry; import java.io.InputStream; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.TimerTask; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + /** * A function container implemented using java thread. */ @@ -97,8 +97,14 @@ class ProcessRuntime implements Runtime { // by the child process and manually added to classpath args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile)); args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml"); - args.add("-Dpulsar.log.dir=" + logDirectory); - args.add("-Dpulsar.log.file=" + instanceConfig.getFunctionDetails().getName()); + args.add("-Dpulsar.function.log.dir=" + String.format( + "%s/%s", + logDirectory, + FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()))); + args.add("-Dpulsar.function.log.file=" + String.format( + "%s-%s", + instanceConfig.getFunctionDetails().getName(), + instanceConfig.getInstanceId())); if (instanceConfig.getFunctionDetails().getResources() != null) { Function.Resources resources = instanceConfig.getFunctionDetails().getResources(); if (resources.getRam() != 0) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 5cb8ce0..d752ed7 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -72,7 +72,9 @@ class ThreadRuntime implements Runtime { public void start() { log.info("ThreadContainer starting function with instance config {}", instanceConfig); this.fnThread = new Thread(threadGroup, javaInstanceRunnable, - FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())); + String.format("%s-%s", + FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()), + instanceConfig.getInstanceId())); this.fnThread.start(); } diff --git a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml index 25583a6..df25367 100644 --- a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml +++ b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml @@ -23,16 +23,10 @@ Configuration: Properties: Property: - - name: "pulsar.log.dir" - value: "logs" - - name: "pulsar.log.file" - value: "pulsar-functions.log" - name: "pulsar.log.appender" - value: "RoutingAppender" + value: "RollingFile" - name: "pulsar.log.level" value: "info" - - name: "pulsar.routing.appender.default" - value: "RollingFile" - name: "bk.log.level" value: "info" @@ -48,8 +42,8 @@ Configuration: # Rolling file appender configuration RollingFile: name: RollingFile - fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}" - filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz" + fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log" + filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz" immediateFlush: false PatternLayout: Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" @@ -66,19 +60,19 @@ Configuration: # Delete file older than 30days DefaultRolloverStrategy: Delete: - basePath: ${sys:pulsar.log.dir} + basePath: ${sys:pulsar.function.log.dir} maxDepth: 2 IfFileName: - glob: "*/${sys:pulsar.log.file}*log.gz" + glob: "*/${sys:pulsar.function.log.file}*log.gz" IfLastModified: age: 30d # Rolling file appender configuration for bk RollingRandomAccessFile: name: BkRollingFile - fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk" - filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz" - immediateFlush: true + fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk" + filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz" + immediateFlush: false PatternLayout: Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" Policies: @@ -94,67 +88,17 @@ Configuration: # Delete file older than 30days DefaultRolloverStrategy: Delete: - basePath: ${sys:pulsar.log.dir} + basePath: ${sys:pulsar.function.log.dir} maxDepth: 2 IfFileName: - glob: "*/${sys:pulsar.log.file}.bk*log.gz" + glob: "*/${sys:pulsar.function.log.file}.bk*log.gz" IfLastModified: age: 30d - # Routing - Routing: - name: RoutingAppender - Routes: - pattern: "$${ctx:function}" - Route: - - - Routing: - name: InstanceRoutingAppender - Routes: - pattern: "$${ctx:instance}" - Route: - - - RollingFile: - name: "Rolling-${ctx:function}" - fileName : "${sys:pulsar.log.dir}/${ctx:function}/${sys:pulsar.log.file}-${ctx:instance}.log" - filePattern : "${sys:pulsar.log.dir}/${ctx:function}/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz" - PatternLayout: - Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance-%X{instance}] %logger{1} - %msg%n" - Policies: - TimeBasedTriggeringPolicy: - interval: 1 - modulate: true - SizeBasedTriggeringPolicy: - size: "20MB" - # Trigger every day at midnight that also scan - # roll-over strategy that deletes older file - CronTriggeringPolicy: - schedule: "0 0 0 * * ?" - # Delete file older than 30days - DefaultRolloverStrategy: - Delete: - basePath: ${sys:pulsar.log.dir} - maxDepth: 2 - IfFileName: - glob: "*/${sys:pulsar.log.file}*log.gz" - IfLastModified: - age: 30d - - ref: "${sys:pulsar.routing.appender.default}" - key: "${ctx:function}" - - ref: "${sys:pulsar.routing.appender.default}" - key: "${ctx:function}" - Loggers: Logger: - name: org.apache.bookkeeper - level: "${sys:bk.log.level}" - additivity: false - AppenderRef: - - ref: BkRollingFile - - Logger: - name: org.apache.distributedlog + name: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper level: "${sys:bk.log.level}" additivity: false AppenderRef: diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 5a6517e..6f1e2f3 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.ConsumerSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; @@ -119,7 +120,8 @@ public class ProcessRuntimeTest { String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile + " -Dlog4j.configurationFile=java_instance_log4j2.yml " - + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " -Dpulsar.log.file=" + config.getFunctionDetails().getName() + + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails()) + + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId() + " org.apache.pulsar.functions.runtime.JavaInstanceMain" + " --jar " + userJarFile + " --instance_id " + config.getInstanceId() + " --function_id " + config.getFunctionId()