This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 45b6d73 Add log dir creation in ProcessRuntime (#3114) 45b6d73 is described below commit 45b6d73ca3c2735ba3df2dbee4bbf70da6eaca5d Author: Ali Ahmed <alahmed...@gmail.com> AuthorDate: Tue Dec 4 10:38:09 2018 -0800 Add log dir creation in ProcessRuntime (#3114) --- .../pulsar/functions/runtime/ProcessRuntime.java | 21 +++++++++++++++++++++ .../pulsar/functions/runtime/RuntimeUtils.java | 12 ++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 70e7a3a..6a5f79a 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -39,6 +39,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.Utils; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.List; @@ -72,6 +73,7 @@ class ProcessRuntime implements Runtime { private final SecretsProviderConfigurator secretsProviderConfigurator; private final String extraDependenciesDir; private static final long GRPC_TIMEOUT_SECS = 5; + private final String funcLogDir; ProcessRuntime(InstanceConfig instanceConfig, String instanceFile, @@ -88,6 +90,7 @@ class ProcessRuntime implements Runtime { this.metricsPort = Utils.findAvailablePort(); this.expectedHealthCheckInterval = expectedHealthCheckInterval; this.secretsProviderConfigurator = secretsProviderConfigurator; + this.funcLogDir = RuntimeUtils.genFunctionLogFolder(logDirectory, instanceConfig); String logConfigFile = null; String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails()); String secretsProviderConfig = null; @@ -133,6 +136,19 @@ class ProcessRuntime implements Runtime { @Override public void start() { java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> process.destroy())); + + // Note: we create the expected log folder before the function process logger attempts to create it + // This is because if multiple instances are launched they can encounter a race condition creation of the dir. + log.info("Creating function log directory {}", funcLogDir); + boolean success = createFolder(funcLogDir); + + if (!success) { + log.error("Log folder could not be created : {}", funcLogDir); + throw new RuntimeException("Log folder creation error"); + } + + log.info("Created function log directory {}", funcLogDir); + startProcess(); if (channel == null && stub == null) { channel = ManagedChannelBuilder.forAddress("127.0.0.1", instancePort) @@ -332,6 +348,11 @@ class ProcessRuntime implements Runtime { return true; } + private boolean createFolder(final String path) { + final boolean success = new File(path).mkdirs(); + return success; + } + private void tryExtractingDeathException() { InputStream errorStream = process.getErrorStream(); try { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index a5f55c5..5e09e66 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -84,10 +84,7 @@ class RuntimeUtils { args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir)); } args.add("-Dlog4j.configurationFile=" + logConfigFile); - args.add("-Dpulsar.function.log.dir=" + String.format( - "%s/%s", - logDirectory, - FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()))); + args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig)); args.add("-Dpulsar.function.log.file=" + String.format( "%s-%s", instanceConfig.getFunctionDetails().getName(), @@ -194,6 +191,13 @@ class RuntimeUtils { return args; } + public static String genFunctionLogFolder(String logDirectory, InstanceConfig instanceConfig) { + return String.format( + "%s/%s", + logDirectory, + FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())); + } + public static String getPrometheusMetrics(int metricsPort) throws IOException{ StringBuilder result = new StringBuilder(); URL url = new URL(String.format("http://%s:%s", InetAddress.getLocalHost().getHostAddress(), metricsPort));