This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 20320c7 Fix null pointer when getting function instance metrics. (#7010) 20320c7 is described below commit 20320c7ca15498f5655668e99cce286be381c76d Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri May 22 11:32:17 2020 -0700 Fix null pointer when getting function instance metrics. (#7010) * Fix null pointer when getting function instance metrics. * Made more functions sync * Made the remaining public interface synchronized * Made stats class public method synchronized so they are thread safe. * Made setup synchronized so that it and close won't run together * Undo making stats sync until we resolve differences * Incorporated feedback Co-authored-by: Sanjeev Kulkarni <sanje...@splunk.com> --- .../functions/instance/JavaInstanceRunnable.java | 75 ++++++++++++++-------- .../functions/runtime/thread/ThreadRuntime.java | 2 +- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 0fa4fc6..892c7e5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -24,6 +24,8 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; + +import java.io.IOException; import java.util.concurrent.CompletableFuture; import lombok.AccessLevel; import lombok.Getter; @@ -102,9 +104,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // provide tables for storing states private final String stateStorageServiceUrl; - @Getter(AccessLevel.PACKAGE) private StorageClient storageClient; - @Getter(AccessLevel.PACKAGE) private Table<ByteBuf, ByteBuf> stateTable; private JavaInstance javaInstance; @@ -112,7 +112,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Throwable deathException; // function stats - @Getter private ComponentStatsManager stats; private Record<?> currentRecord; @@ -178,7 +177,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { /** * NOTE: this method should be called in the instance thread, in order to make class loading work. */ - JavaInstance setupJavaInstance() throws Exception { + synchronized private void setup() throws Exception { + + this.instanceCache = InstanceCache.getInstanceCache(); + + if (this.collectorRegistry == null) { + this.collectorRegistry = new CollectorRegistry(); + } + this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, + this.instanceCache.getScheduledExecutorService(), + this.componentType); + // initialize the thread context ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())); ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName()); @@ -218,7 +227,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // start any log topic handler setupLogHandler(); - return new JavaInstance(contextImpl, object, instanceConfig); + javaInstance = new JavaInstance(contextImpl, object, instanceConfig); } ContextImpl setupContext() { @@ -234,16 +243,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @Override public void run() { try { - this.instanceCache = InstanceCache.getInstanceCache(); - - if (this.collectorRegistry == null) { - this.collectorRegistry = new CollectorRegistry(); - } - this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, - this.instanceCache.getScheduledExecutorService(), - this.componentType); - - javaInstance = setupJavaInstance(); + setup(); + while (true) { currentRecord = readInput(); @@ -546,13 +547,32 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } - public InstanceCommunication.MetricsData getAndResetMetrics() { - InstanceCommunication.MetricsData metricsData = getMetrics(); - stats.reset(); + synchronized public String getStatsAsString() throws IOException { + if (stats != null) { + return stats.getStatsAsString(); + } else { + return ""; + } + } + + // This method is synchronized because it is using the stats variable + synchronized public InstanceCommunication.MetricsData getAndResetMetrics() { + InstanceCommunication.MetricsData metricsData = internalGetMetrics(); + internalResetMetrics(); return metricsData; } - public InstanceCommunication.MetricsData getMetrics() { + // This method is synchronized because it is using the stats and javaInstance variables + synchronized public InstanceCommunication.MetricsData getMetrics() { + return internalGetMetrics(); + } + + // This method is synchronized because it is using the stats and javaInstance variables + synchronized public void resetMetrics() { + internalResetMetrics(); + } + + private InstanceCommunication.MetricsData internalGetMetrics() { InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder(); if (javaInstance != null) { Map<String, Double> userMetrics = javaInstance.getMetrics(); @@ -563,9 +583,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { return bldr.build(); } - public void resetMetrics() { - stats.reset(); - javaInstance.resetMetrics(); + private void internalResetMetrics() { + if (stats != null) { + stats.reset(); + } + if (javaInstance != null) { + javaInstance.resetMetrics(); + } } private Builder createMetricsDataBuilder() { @@ -588,7 +612,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { return bldr; } - public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() { + // This method is synchronized because it is using the stats variable + synchronized public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() { InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); if (stats != null) { functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived()); @@ -643,7 +668,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { config.getRootLogger().removeAppender(logAppender.getName()); } - public void setupInput(ContextImpl contextImpl) throws Exception { + private void setupInput(ContextImpl contextImpl) throws Exception { SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource(); Object object; @@ -745,7 +770,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } - public void setupOutput(ContextImpl contextImpl) throws Exception { + private void setupOutput(ContextImpl contextImpl) throws Exception { SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink(); Object object; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java index ffbed6b..5cc74c5 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java @@ -175,7 +175,7 @@ public class ThreadRuntime implements Runtime { @Override public String getPrometheusMetrics() throws IOException { - return javaInstanceRunnable.getStats().getStatsAsString(); + return javaInstanceRunnable.getStatsAsString(); } @Override