This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 14fc5d9  Schedule task to update function stats separately (#2128)
14fc5d9 is described below

commit 14fc5d956c0044743984e839cce0acdd9d7f5336
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu Jul 12 14:54:49 2018 -0700

    Schedule task to update function stats separately (#2128)
    
    keep previous stats sample
    
    configurable update metrics task
    
    fix test
---
 conf/functions_worker.yml                          |  1 +
 .../pulsar/functions/instance/ContextImpl.java     | 19 +++++++--
 .../pulsar/functions/instance/FunctionStats.java   | 25 +++++++++++-
 .../pulsar/functions/instance/JavaInstance.java    |  8 ++++
 .../functions/instance/JavaInstanceRunnable.java   | 45 +++++++++++++++------
 .../src/main/python/InstanceCommunication_pb2.py   | 24 +++++++++--
 .../main/python/InstanceCommunication_pb2_grpc.py  | 34 ++++++++++++++++
 .../instance/src/main/python/contextimpl.py        | 23 ++++++++---
 .../instance/src/main/python/python_instance.py    | 46 +++++++++++++++++-----
 .../instance/src/main/python/server.py             |  9 +++++
 .../src/main/proto/InstanceCommunication.proto     |  2 +
 .../pulsar/functions/runtime/JavaInstanceMain.java | 30 ++++++++++++++
 .../pulsar/functions/runtime/ProcessRuntime.java   | 44 +++++++++++++++++++++
 .../apache/pulsar/functions/runtime/Runtime.java   |  4 ++
 .../pulsar/functions/runtime/ThreadRuntime.java    | 12 ++++++
 .../functions/worker/FunctionRuntimeManager.java   | 18 +++++++++
 .../functions/worker/FunctionsStatsGenerator.java  |  4 +-
 .../pulsar/functions/worker/WorkerConfig.java      |  3 +-
 .../pulsar/functions/worker/WorkerService.java     | 18 +++++++++
 .../worker/FunctionStatsGeneratorTest.java         |  3 +-
 20 files changed, 336 insertions(+), 36 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 73edbfb..58bcf1d 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,3 +44,4 @@ rescheduleTimeoutMs: 60000
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 30000
+metricsSamplingPeriodSec: 60
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 0a1b965..b41d6f4 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -91,6 +91,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         }
     }
 
+    private ConcurrentMap<String, AccumulatedMetricDatum> 
currentAccumulatedMetrics;
     private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics;
 
     private Map<String, Producer> publishProducers;
@@ -110,6 +111,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         this.logger = logger;
         this.pulsarClient = client;
         this.classLoader = classLoader;
+        this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
         this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
         this.publishSerializers = new HashMap<>();
@@ -324,11 +326,23 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
 
     @Override
     public void recordMetric(String metricName, double value) {
-        accumulatedMetrics.putIfAbsent(metricName, new 
AccumulatedMetricDatum());
-        accumulatedMetrics.get(metricName).update(value);
+        currentAccumulatedMetrics.putIfAbsent(metricName, new 
AccumulatedMetricDatum());
+        currentAccumulatedMetrics.get(metricName).update(value);
     }
 
     public MetricsData getAndResetMetrics() {
+        MetricsData retval = getMetrics();
+        resetMetrics();
+        return retval;
+    }
+
+    public void resetMetrics() {
+        this.accumulatedMetrics.clear();
+        this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
+        this.currentAccumulatedMetrics.clear();
+    }
+    
+    public MetricsData getMetrics() {
         MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder();
         for (String metricName : accumulatedMetrics.keySet()) {
             MetricsData.DataDigest.Builder bldr = 
MetricsData.DataDigest.newBuilder();
@@ -339,7 +353,6 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
             metricsDataBuilder.putMetrics(metricName, bldr.build());
         }
         MetricsData retval = metricsDataBuilder.build();
-        accumulatedMetrics.clear();
         return retval;
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
index 899777c..c45837b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java
@@ -102,13 +102,35 @@ public class FunctionStats {
                 return totalLatencyMs / totalSuccessfullyProcessed;
             }
         }
+        
+        public void update(Stats stats) {
+            if (stats == null) {
+                return;
+            }
+            this.totalProcessed = stats.totalProcessed;
+            this.totalSuccessfullyProcessed = stats.totalSuccessfullyProcessed;
+            this.totalUserExceptions = stats.totalUserExceptions;
+            this.latestUserExceptions.clear();
+            this.latestSystemExceptions.clear();
+            this.totalDeserializationExceptions.clear();
+            this.latestUserExceptions.addAll(stats.latestUserExceptions);
+            this.latestSystemExceptions.addAll(stats.latestSystemExceptions);
+            
this.totalDeserializationExceptions.putAll(stats.totalDeserializationExceptions);
+            this.totalSystemExceptions = stats.totalSystemExceptions;
+            this.latestSystemExceptions = stats.latestSystemExceptions;
+            this.totalSerializationExceptions = 
stats.totalSerializationExceptions;
+            this.totalLatencyMs = stats.totalLatencyMs;
+            this.lastInvocationTime = stats.lastInvocationTime;
+        }
     }
 
     private Stats currentStats;
     private Stats totalStats;
-
+    private Stats stats;
+    
     public FunctionStats() {
         currentStats = new Stats();
+        stats = new Stats();
         totalStats = new Stats();
     }
 
@@ -138,6 +160,7 @@ public class FunctionStats {
         totalStats.incrementSerializationExceptions();
     }
     public void resetCurrent() {
+        stats.update(currentStats);
         currentStats.reset();
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 496b43f..42ade3f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -76,4 +76,12 @@ public class JavaInstance implements AutoCloseable {
     public InstanceCommunication.MetricsData getAndResetMetrics() {
         return context.getAndResetMetrics();
     }
+
+    public void resetMetrics() {
+        context.resetMetrics();
+    }
+
+    public InstanceCommunication.MetricsData getMetrics() {
+        return context.getMetrics();
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d2f832d..ba0de38 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -52,9 +52,9 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.sink.PulsarSink;
@@ -389,16 +389,7 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     }
 
     public InstanceCommunication.MetricsData getAndResetMetrics() {
-        InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
-        addSystemMetrics("__total_processed__", 
stats.getCurrentStats().getTotalProcessed(), bldr);
-        addSystemMetrics("__total_successfully_processed__", 
stats.getCurrentStats().getTotalSuccessfullyProcessed(), bldr);
-        addSystemMetrics("__total_system_exceptions__", 
stats.getCurrentStats().getTotalSystemExceptions(), bldr);
-        addSystemMetrics("__total_user_exceptions__", 
stats.getCurrentStats().getTotalUserExceptions(), bldr);
-        
stats.getCurrentStats().getTotalDeserializationExceptions().forEach((topic, 
count) -> {
-            addSystemMetrics("__total_deserialization_exceptions__" + topic, 
count, bldr);
-        });
-        addSystemMetrics("__total_serialization_exceptions__", 
stats.getCurrentStats().getTotalSerializationExceptions(), bldr);
-        addSystemMetrics("__avg_latency_ms__", 
stats.getCurrentStats().computeLatency(), bldr);
+        InstanceCommunication.MetricsData.Builder bldr = 
createMetricsDataBuilder();
         stats.resetCurrent();
         if (javaInstance != null) {
             InstanceCommunication.MetricsData userMetrics =  
javaInstance.getAndResetMetrics();
@@ -409,6 +400,38 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         return bldr.build();
     }
 
+    public InstanceCommunication.MetricsData getMetrics() {
+        InstanceCommunication.MetricsData.Builder bldr = 
createMetricsDataBuilder();
+        if (javaInstance != null) {
+            InstanceCommunication.MetricsData userMetrics =  
javaInstance.getMetrics();
+            if (userMetrics != null) {
+                bldr.putAllMetrics(userMetrics.getMetricsMap());
+            }
+        }
+        return bldr.build();
+    }
+
+    public void resetMetrics() {
+        stats.resetCurrent();
+        javaInstance.resetMetrics();
+    }
+    
+    private Builder createMetricsDataBuilder() {
+        InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
+        addSystemMetrics("__total_processed__", 
stats.getStats().getTotalProcessed(), bldr);
+        addSystemMetrics("__total_successfully_processed__", 
stats.getStats().getTotalSuccessfullyProcessed(),
+                bldr);
+        addSystemMetrics("__total_system_exceptions__", 
stats.getStats().getTotalSystemExceptions(), bldr);
+        addSystemMetrics("__total_user_exceptions__", 
stats.getStats().getTotalUserExceptions(), bldr);
+        stats.getStats().getTotalDeserializationExceptions().forEach((topic, 
count) -> {
+            addSystemMetrics("__total_deserialization_exceptions__" + topic, 
count, bldr);
+        });
+        addSystemMetrics("__total_serialization_exceptions__",
+                stats.getStats().getTotalSerializationExceptions(), bldr);
+        addSystemMetrics("__avg_latency_ms__", 
stats.getStats().computeLatency(), bldr);
+        return bldr;
+    }
+
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = 
InstanceCommunication.FunctionStatus.newBuilder();
         
functionStatusBuilder.setNumProcessed(stats.getTotalStats().getTotalProcessed());
diff --git 
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py 
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index b4e814d..49c77d6 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
+  
serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01
 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 
\x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 
\x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 
\n\x18numSuccessfullyProcessed\x18\x05 
\x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 
\x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 
\x03(\x0b\x32*.proto.FunctionStatus.Ex [...]
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -513,7 +513,7 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
   index=0,
   options=None,
   serialized_start=1068,
-  serialized_end=1290,
+  serialized_end=1416,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
@@ -534,9 +534,27 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor(
     options=None,
   ),
   _descriptor.MethodDescriptor(
+    name='ResetMetrics',
+    full_name='proto.InstanceControl.ResetMetrics',
+    index=2,
+    containing_service=None,
+    input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
+    output_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
+    options=None,
+  ),
+  _descriptor.MethodDescriptor(
+    name='GetMetrics',
+    full_name='proto.InstanceControl.GetMetrics',
+    index=3,
+    containing_service=None,
+    input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
+    output_type=_METRICSDATA,
+    options=None,
+  ),
+  _descriptor.MethodDescriptor(
     name='HealthCheck',
     full_name='proto.InstanceControl.HealthCheck',
-    index=2,
+    index=4,
     containing_service=None,
     input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
     output_type=_HEALTHCHECKRESULT,
diff --git 
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py 
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
index 575f994..21730e1 100644
--- 
a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
+++ 
b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
@@ -44,6 +44,16 @@ class InstanceControlStub(object):
         
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
         
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
         )
+    self.ResetMetrics = channel.unary_unary(
+        '/proto.InstanceControl/ResetMetrics',
+        
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+        
response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+        )
+    self.GetMetrics = channel.unary_unary(
+        '/proto.InstanceControl/GetMetrics',
+        
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+        
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
+        )
     self.HealthCheck = channel.unary_unary(
         '/proto.InstanceControl/HealthCheck',
         
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
@@ -69,6 +79,20 @@ class InstanceControlServicer(object):
     context.set_details('Method not implemented!')
     raise NotImplementedError('Method not implemented!')
 
+  def ResetMetrics(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+  def GetMetrics(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
   def HealthCheck(self, request, context):
     # missing associated documentation comment in .proto file
     pass
@@ -89,6 +113,16 @@ def add_InstanceControlServicer_to_server(servicer, server):
           
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
           
response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
       ),
+      'ResetMetrics': grpc.unary_unary_rpc_method_handler(
+          servicer.ResetMetrics,
+          
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+          
response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+      ),
+      'GetMetrics': grpc.unary_unary_rpc_method_handler(
+          servicer.GetMetrics,
+          
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+          
response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
+      ),
       'HealthCheck': grpc.unary_unary_rpc_method_handler(
           servicer.HealthCheck,
           
request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py 
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 9f4a34f..f0f9d0f 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -54,6 +54,7 @@ class ContextImpl(pulsar.Context):
     self.pulsar_client = pulsar_client
     self.user_code_dir = os.path.dirname(user_code)
     self.consumers = consumers
+    self.current_accumulated_metrics = {}
     self.accumulated_metrics = {}
     self.publish_producers = {}
     self.publish_serializers = {}
@@ -107,9 +108,9 @@ class ContextImpl(pulsar.Context):
     return self.user_config
 
   def record_metric(self, metric_name, metric_value):
-    if not metric_name in self.accumulated_metrics:
-      self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
-    self.accumulated_metrics[metric_name].update(metric_value)
+    if not metric_name in self.current_accumulated_metrics:
+      self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
+    self.current_accumulated_metrics[metric_name].update(metric_value)
 
   def get_output_topic(self):
     return self.instance_config.function_details.output
@@ -143,6 +144,18 @@ class ContextImpl(pulsar.Context):
     self.consumers[topic].acknowledge(msgid)
 
   def get_and_reset_metrics(self):
+    metrics = self.get_metrics()
+    # TODO(sanjeev):- Make this thread safe
+    self.reset_metrics()
+    return metrics
+
+  def reset_metrics(self):
+    # TODO: Make it thread safe
+    self.accumulated_metrics.clear()
+    self.accumulated_metrics.update(self.current_accumulated_metrics)
+    self.current_accumulated_metrics.clear()
+
+  def get_metrics(self):
     metrics = InstanceCommunication_pb2.MetricsData()
     for metric_name, accumulated_metric in self.accumulated_metrics.items():
       m = InstanceCommunication_pb2.MetricsData.DataDigest()
@@ -151,6 +164,4 @@ class ContextImpl(pulsar.Context):
       m.max = accumulated_metric.max
       m.min = accumulated_metric.min
       metrics.metrics[metric_name] = m
-    # TODO(sanjeev):- Make this thread safe
-    self.accumulated_metrics.clear()
-    return metrics
+    return metrics
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 7120249..70b78f0 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -97,6 +97,22 @@ class Stats(object):
     else:
       return self.latency / self.nsuccessfullyprocessed
 
+  def update(self, object):
+    self.nprocessed = object.nprocessed
+    self.nsuccessfullyprocessed = object.nsuccessfullyprocessed
+    self.nuserexceptions = object.nuserexceptions
+    self.nsystemexceptions = object.nsystemexceptions
+    self.nserialization_exceptions = object.nserialization_exceptions
+    self.latency = object.latency
+    self.lastinvocationtime = object.lastinvocationtime
+    self.latestuserexceptions = []
+    self.latestsystemexceptions = []
+    self.ndeserialization_exceptions.clear()
+    self.latestuserexceptions.append(object.latestuserexceptions)
+    self.latestsystemexceptions.append(object.latestsystemexceptions)
+    self.ndeserialization_exceptions.update(object.ndeserialization_exceptions)
+    
+
 class PythonInstance(object):
   def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples, user_code, log_topic, pulsar_client):
     self.instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_details, max_buffered_tuples)
@@ -119,6 +135,7 @@ class PythonInstance(object):
     self.contextimpl = None
     self.total_stats = Stats()
     self.current_stats = Stats()
+    self.stats = Stats()
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if 
function_details.source.timeoutMs > 0 else None
 
@@ -278,19 +295,30 @@ class PythonInstance(object):
 
   def get_and_reset_metrics(self):
     # First get any user metrics
-    metrics = self.contextimpl.get_and_reset_metrics()
+    metrics = self.get_metrics()
+    self.reset_metrics()
+    return metrics
+
+  def reset_metrics(self):
+    self.stats.update(self.current_stats)
+    self.current_stats.reset()
+    self.contextimpl.reset_metrics()
+
+  def get_metrics(self):
+    # First get any user metrics
+    metrics = self.contextimpl.get_metrics()
     # Now add system metrics as well
-    self.add_system_metrics("__total_processed__", 
self.current_stats.nprocessed, metrics)
-    self.add_system_metrics("__total_successfully_processed__", 
self.current_stats.nsuccessfullyprocessed, metrics)
-    self.add_system_metrics("__total_system_exceptions__", 
self.current_stats.nsystemexceptions, metrics)
-    self.add_system_metrics("__total_user_exceptions__", 
self.current_stats.nuserexceptions, metrics)
-    for (topic, metric) in 
self.current_stats.ndeserialization_exceptions.items():
+    self.add_system_metrics("__total_processed__", self.stats.nprocessed, 
metrics)
+    self.add_system_metrics("__total_successfully_processed__", 
self.stats.nsuccessfullyprocessed, metrics)
+    self.add_system_metrics("__total_system_exceptions__", 
self.stats.nsystemexceptions, metrics)
+    self.add_system_metrics("__total_user_exceptions__", 
self.stats.nuserexceptions, metrics)
+    for (topic, metric) in self.stats.ndeserialization_exceptions.items():
       self.add_system_metrics("__total_deserialization_exceptions__" + topic, 
metric, metrics)
-    self.add_system_metrics("__total_serialization_exceptions__", 
self.current_stats.nserialization_exceptions, metrics)
-    self.add_system_metrics("__avg_latency_ms__", 
self.current_stats.compute_latency(), metrics)
-    self.current_stats.reset()
+    self.add_system_metrics("__total_serialization_exceptions__", 
self.stats.nserialization_exceptions, metrics)
+    self.add_system_metrics("__avg_latency_ms__", 
self.stats.compute_latency(), metrics)
     return metrics
 
+
   def add_system_metrics(self, metric_name, value, metrics):
     metrics.metrics[metric_name].count = value
     metrics.metrics[metric_name].sum = value
diff --git a/pulsar-functions/instance/src/main/python/server.py 
b/pulsar-functions/instance/src/main/python/server.py
index 193e6ab..611a737 100644
--- a/pulsar-functions/instance/src/main/python/server.py
+++ b/pulsar-functions/instance/src/main/python/server.py
@@ -42,6 +42,15 @@ class 
InstanceCommunicationServicer(InstanceCommunication_pb2_grpc.InstanceContr
     Log.info("Came in GetAndResetMetrics")
     return self.pyinstance.get_and_reset_metrics()
 
+  def ResetMetrics(self, request, context):
+    Log.info("Came in ResetMetrics")
+    self.pyinstance.reset_metrics()
+    return request
+
+  def GetMetrics(self, request, context):
+    Log.info("Came in GetMetrics")
+    return self.pyinstance.get_metrics()
+
   def HealthCheck(self, request, context):
     return self.pyinstance.health_check()
 
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto 
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 8db742b..ed8c95a 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -72,5 +72,7 @@ message HealthCheckResult {
 service InstanceControl {
     rpc GetFunctionStatus(google.protobuf.Empty) returns (FunctionStatus) {}
     rpc GetAndResetMetrics(google.protobuf.Empty) returns (MetricsData) {}
+    rpc ResetMetrics(google.protobuf.Empty) returns (google.protobuf.Empty) {}
+    rpc GetMetrics(google.protobuf.Empty) returns (MetricsData) {}
     rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResult) {}
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 5a63ec3..0c10fdb 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -349,6 +349,36 @@ public class JavaInstanceMain implements AutoCloseable {
         }
 
         @Override
+        public void getMetrics(com.google.protobuf.Empty request,
+                                       
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
 responseObserver) {
+            Runtime runtime = runtimeSpawner.getRuntime();
+            if (runtime != null) {
+                try {
+                    InstanceCommunication.MetricsData metrics = 
runtime.getMetrics().get();
+                    responseObserver.onNext(metrics);
+                    responseObserver.onCompleted();
+                } catch (InterruptedException | ExecutionException e) {
+                    log.error("Exception in JavaInstance doing 
getAndResetMetrics", e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public void resetMetrics(com.google.protobuf.Empty request,
+                io.grpc.stub.StreamObserver<com.google.protobuf.Empty> 
responseObserver) {
+            Runtime runtime = runtimeSpawner.getRuntime();
+            if (runtime != null) {
+                try {
+                    runtime.resetMetrics().get();
+                    responseObserver.onCompleted();
+                } catch (InterruptedException | ExecutionException e) {
+                    log.error("Exception in JavaInstance doing 
getAndResetMetrics", e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        
+        @Override
         public void healthCheck(com.google.protobuf.Empty request,
                                 
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult>
 responseObserver) {
             log.debug("Recieved health check request...");
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index bd4e3ae..95d7a53 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -347,6 +347,50 @@ class ProcessRuntime implements Runtime {
         return retval;
     }
 
+    @Override
+    public CompletableFuture<Void> resetMetrics() {
+        CompletableFuture<Void> retval = new CompletableFuture<>();
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<Empty> response = 
stub.resetMetrics(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<Empty>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                retval.completeExceptionally(throwable);
+            }
+
+            @Override
+            public void onSuccess(Empty t) {
+                retval.complete(null);
+            }
+        });
+        return retval;
+    }
+
+    @Override
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+        CompletableFuture<InstanceCommunication.MetricsData> retval = new 
CompletableFuture<>();
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<InstanceCommunication.MetricsData> response = 
stub.getMetrics(Empty.newBuilder().build());
+        Futures.addCallback(response, new 
FutureCallback<InstanceCommunication.MetricsData>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                retval.completeExceptionally(throwable);
+            }
+
+            @Override
+            public void onSuccess(InstanceCommunication.MetricsData t) {
+                retval.complete(t);
+            }
+        });
+        return retval;
+    }
+    
     public CompletableFuture<InstanceCommunication.HealthCheckResult> 
healthCheck() {
         CompletableFuture<InstanceCommunication.HealthCheckResult> retval = 
new CompletableFuture<>();
         if (stub == null) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index 3c71800..ea99290 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -41,5 +41,9 @@ public interface Runtime {
     CompletableFuture<InstanceCommunication.FunctionStatus> 
getFunctionStatus();
 
     CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics();
+    
+    CompletableFuture<Void> resetMetrics();
+    
+    CompletableFuture<InstanceCommunication.MetricsData> getMetrics();
 
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 9d9654d..3b53fb8 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -113,6 +113,18 @@ class ThreadRuntime implements Runtime {
     public CompletableFuture<InstanceCommunication.MetricsData> 
getAndResetMetrics() {
         return 
CompletableFuture.completedFuture(javaInstanceRunnable.getAndResetMetrics());
     }
+    
+    
+    @Override
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+        return 
CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());
+    }
+
+    @Override
+    public CompletableFuture<Void> resetMetrics() {
+        javaInstanceRunnable.resetMetrics();
+        return CompletableFuture.completedFuture(null);
+    }
 
     @Override
     public boolean isAlive() {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 08de636..eb5afb9 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -31,6 +31,7 @@ import 
org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 
@@ -42,6 +43,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -438,6 +440,22 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
         return this.functionRuntimeInfoMap;
     }
+    
+    public void updateRates() {
+        for (Entry<String, FunctionRuntimeInfo> entry : 
this.functionRuntimeInfoMap.entrySet()) {
+            RuntimeSpawner functionRuntimeSpawner = 
entry.getValue().getRuntimeSpawner();
+            if (functionRuntimeSpawner != null) {
+                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+                if (functionRuntime != null) {
+                    try {
+                        functionRuntime.resetMetrics().get();
+                    } catch (Exception e) {
+                        log.error("Failed to update stats for {}-{}", 
entry.getKey(), e.getMessage());
+                    }
+                }
+            }
+        }
+    }
     /**
      * Private methods for internal use.  Should not be used outside of this 
class
      */
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 0a829cb..745f5d7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -49,7 +49,9 @@ public class FunctionsStatsGenerator {
                     Runtime functionRuntime = 
functionRuntimeSpawner.getRuntime();
                     if (functionRuntime != null) {
                         try {
-                            InstanceCommunication.MetricsData metrics = 
functionRuntime.getAndResetMetrics().get();
+                            InstanceCommunication.MetricsData metrics = 
workerService.getWorkerConfig()
+                                    .getMetricsSamplingPeriodSec() > 0 ? 
functionRuntime.getMetrics().get()
+                                            : 
functionRuntime.getAndResetMetrics().get();
                             for (Map.Entry<String, 
InstanceCommunication.MetricsData.DataDigest> metricsEntry
                                     : metrics.getMetricsMap().entrySet()) {
                                 String metricName = metricsEntry.getKey();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index c7ebcf9..9d5dfaa 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -73,7 +73,8 @@ public class WorkerConfig implements Serializable {
     private String tlsTrustCertsFilePath = "";
     private boolean tlsAllowInsecureConnection = false;
     private boolean tlsHostnameVerificationEnable = false;
-    
+    private int metricsSamplingPeriodSec = 60;
+
     @Data
     @Setter
     @Getter
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 4393331..29dff53 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -20,7 +20,14 @@ package org.apache.pulsar.functions.worker;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -49,11 +56,14 @@ public class WorkerService {
     private MembershipManager membershipManager;
     private SchedulerManager schedulerManager;
     private boolean isInitialized = false;
+    private final ScheduledExecutorService statsUpdater;
 
     private ConnectorsManager connectorsManager;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
+        this.statsUpdater = Executors
+                .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-stats-updater"));
     }
 
     public void start(URI dlogUri) throws InterruptedException {
@@ -138,6 +148,14 @@ public class WorkerService {
             // indicate function worker service is done intializing
             this.isInitialized = true;
 
+            this.connectorsManager = new ConnectorsManager(workerConfig);
+            
+            int metricsSamplingPeriodSec = 
this.workerConfig.getMetricsSamplingPeriodSec();
+            if (metricsSamplingPeriodSec > 0) {
+                this.statsUpdater.scheduleAtFixedRate(() -> 
this.functionRuntimeManager.updateRates(),
+                        metricsSamplingPeriodSec, metricsSamplingPeriodSec, 
TimeUnit.SECONDS);
+            }
+
         } catch (Throwable t) {
             log.error("Error Starting up in worker", t);
             throw new RuntimeException(t);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index 7bd85e8..d320514 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -52,6 +52,7 @@ public class FunctionStatsGeneratorTest {
 
         WorkerService workerService = mock(WorkerService.class);
         
doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
+        doReturn(new WorkerConfig()).when(workerService).getWorkerConfig();
 
         CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = 
InstanceCommunication.MetricsData.newBuilder()
@@ -66,7 +67,7 @@ public class FunctionStatsGeneratorTest {
 
         metricsDataCompletableFuture.complete(metricsData);
         Runtime runtime = mock(Runtime.class);
-        
doReturn(metricsDataCompletableFuture).when(runtime).getAndResetMetrics();
+        doReturn(metricsDataCompletableFuture).when(runtime).getMetrics();
 
         RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
         doReturn(runtime).when(runtimeSpawner).getRuntime();

Reply via email to