This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 14fc5d9 Schedule task to update function stats separately (#2128) 14fc5d9 is described below commit 14fc5d956c0044743984e839cce0acdd9d7f5336 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Jul 12 14:54:49 2018 -0700 Schedule task to update function stats separately (#2128) keep previous stats sample configurable update metrics task fix test --- conf/functions_worker.yml | 1 + .../pulsar/functions/instance/ContextImpl.java | 19 +++++++-- .../pulsar/functions/instance/FunctionStats.java | 25 +++++++++++- .../pulsar/functions/instance/JavaInstance.java | 8 ++++ .../functions/instance/JavaInstanceRunnable.java | 45 +++++++++++++++------ .../src/main/python/InstanceCommunication_pb2.py | 24 +++++++++-- .../main/python/InstanceCommunication_pb2_grpc.py | 34 ++++++++++++++++ .../instance/src/main/python/contextimpl.py | 23 ++++++++--- .../instance/src/main/python/python_instance.py | 46 +++++++++++++++++----- .../instance/src/main/python/server.py | 9 +++++ .../src/main/proto/InstanceCommunication.proto | 2 + .../pulsar/functions/runtime/JavaInstanceMain.java | 30 ++++++++++++++ .../pulsar/functions/runtime/ProcessRuntime.java | 44 +++++++++++++++++++++ .../apache/pulsar/functions/runtime/Runtime.java | 4 ++ .../pulsar/functions/runtime/ThreadRuntime.java | 12 ++++++ .../functions/worker/FunctionRuntimeManager.java | 18 +++++++++ .../functions/worker/FunctionsStatsGenerator.java | 4 +- .../pulsar/functions/worker/WorkerConfig.java | 3 +- .../pulsar/functions/worker/WorkerService.java | 18 +++++++++ .../worker/FunctionStatsGeneratorTest.java | 3 +- 20 files changed, 336 insertions(+), 36 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 73edbfb..58bcf1d 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -44,3 +44,4 @@ rescheduleTimeoutMs: 60000 initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 30000 +metricsSamplingPeriodSec: 60 diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 0a1b965..b41d6f4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -91,6 +91,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { } } + private ConcurrentMap<String, AccumulatedMetricDatum> currentAccumulatedMetrics; private ConcurrentMap<String, AccumulatedMetricDatum> accumulatedMetrics; private Map<String, Producer> publishProducers; @@ -110,6 +111,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { this.logger = logger; this.pulsarClient = client; this.classLoader = classLoader; + this.currentAccumulatedMetrics = new ConcurrentHashMap<>(); this.accumulatedMetrics = new ConcurrentHashMap<>(); this.publishProducers = new HashMap<>(); this.publishSerializers = new HashMap<>(); @@ -324,11 +326,23 @@ class ContextImpl implements Context, SinkContext, SourceContext { @Override public void recordMetric(String metricName, double value) { - accumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum()); - accumulatedMetrics.get(metricName).update(value); + currentAccumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum()); + currentAccumulatedMetrics.get(metricName).update(value); } public MetricsData getAndResetMetrics() { + MetricsData retval = getMetrics(); + resetMetrics(); + return retval; + } + + public void resetMetrics() { + this.accumulatedMetrics.clear(); + this.accumulatedMetrics.putAll(currentAccumulatedMetrics); + this.currentAccumulatedMetrics.clear(); + } + + public MetricsData getMetrics() { MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder(); for (String metricName : accumulatedMetrics.keySet()) { MetricsData.DataDigest.Builder bldr = MetricsData.DataDigest.newBuilder(); @@ -339,7 +353,6 @@ class ContextImpl implements Context, SinkContext, SourceContext { metricsDataBuilder.putMetrics(metricName, bldr.build()); } MetricsData retval = metricsDataBuilder.build(); - accumulatedMetrics.clear(); return retval; } } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java index 899777c..c45837b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStats.java @@ -102,13 +102,35 @@ public class FunctionStats { return totalLatencyMs / totalSuccessfullyProcessed; } } + + public void update(Stats stats) { + if (stats == null) { + return; + } + this.totalProcessed = stats.totalProcessed; + this.totalSuccessfullyProcessed = stats.totalSuccessfullyProcessed; + this.totalUserExceptions = stats.totalUserExceptions; + this.latestUserExceptions.clear(); + this.latestSystemExceptions.clear(); + this.totalDeserializationExceptions.clear(); + this.latestUserExceptions.addAll(stats.latestUserExceptions); + this.latestSystemExceptions.addAll(stats.latestSystemExceptions); + this.totalDeserializationExceptions.putAll(stats.totalDeserializationExceptions); + this.totalSystemExceptions = stats.totalSystemExceptions; + this.latestSystemExceptions = stats.latestSystemExceptions; + this.totalSerializationExceptions = stats.totalSerializationExceptions; + this.totalLatencyMs = stats.totalLatencyMs; + this.lastInvocationTime = stats.lastInvocationTime; + } } private Stats currentStats; private Stats totalStats; - + private Stats stats; + public FunctionStats() { currentStats = new Stats(); + stats = new Stats(); totalStats = new Stats(); } @@ -138,6 +160,7 @@ public class FunctionStats { totalStats.incrementSerializationExceptions(); } public void resetCurrent() { + stats.update(currentStats); currentStats.reset(); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 496b43f..42ade3f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -76,4 +76,12 @@ public class JavaInstance implements AutoCloseable { public InstanceCommunication.MetricsData getAndResetMetrics() { return context.getAndResetMetrics(); } + + public void resetMetrics() { + context.resetMetrics(); + } + + public InstanceCommunication.MetricsData getMetrics() { + return context.getMetrics(); + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index d2f832d..ba0de38 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -52,9 +52,9 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.sink.PulsarSink; @@ -389,16 +389,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } public InstanceCommunication.MetricsData getAndResetMetrics() { - InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); - addSystemMetrics("__total_processed__", stats.getCurrentStats().getTotalProcessed(), bldr); - addSystemMetrics("__total_successfully_processed__", stats.getCurrentStats().getTotalSuccessfullyProcessed(), bldr); - addSystemMetrics("__total_system_exceptions__", stats.getCurrentStats().getTotalSystemExceptions(), bldr); - addSystemMetrics("__total_user_exceptions__", stats.getCurrentStats().getTotalUserExceptions(), bldr); - stats.getCurrentStats().getTotalDeserializationExceptions().forEach((topic, count) -> { - addSystemMetrics("__total_deserialization_exceptions__" + topic, count, bldr); - }); - addSystemMetrics("__total_serialization_exceptions__", stats.getCurrentStats().getTotalSerializationExceptions(), bldr); - addSystemMetrics("__avg_latency_ms__", stats.getCurrentStats().computeLatency(), bldr); + InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder(); stats.resetCurrent(); if (javaInstance != null) { InstanceCommunication.MetricsData userMetrics = javaInstance.getAndResetMetrics(); @@ -409,6 +400,38 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { return bldr.build(); } + public InstanceCommunication.MetricsData getMetrics() { + InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder(); + if (javaInstance != null) { + InstanceCommunication.MetricsData userMetrics = javaInstance.getMetrics(); + if (userMetrics != null) { + bldr.putAllMetrics(userMetrics.getMetricsMap()); + } + } + return bldr.build(); + } + + public void resetMetrics() { + stats.resetCurrent(); + javaInstance.resetMetrics(); + } + + private Builder createMetricsDataBuilder() { + InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); + addSystemMetrics("__total_processed__", stats.getStats().getTotalProcessed(), bldr); + addSystemMetrics("__total_successfully_processed__", stats.getStats().getTotalSuccessfullyProcessed(), + bldr); + addSystemMetrics("__total_system_exceptions__", stats.getStats().getTotalSystemExceptions(), bldr); + addSystemMetrics("__total_user_exceptions__", stats.getStats().getTotalUserExceptions(), bldr); + stats.getStats().getTotalDeserializationExceptions().forEach((topic, count) -> { + addSystemMetrics("__total_deserialization_exceptions__" + topic, count, bldr); + }); + addSystemMetrics("__total_serialization_exceptions__", + stats.getStats().getTotalSerializationExceptions(), bldr); + addSystemMetrics("__avg_latency_ms__", stats.getStats().computeLatency(), bldr); + return bldr; + } + public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() { InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); functionStatusBuilder.setNumProcessed(stats.getTotalStats().getTotalProcessed()); diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py index b4e814d..49c77d6 100644 --- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py +++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py @@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='InstanceCommunication.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Ex [...] + serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.Ex [...] , dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,]) @@ -513,7 +513,7 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor( index=0, options=None, serialized_start=1068, - serialized_end=1290, + serialized_end=1416, methods=[ _descriptor.MethodDescriptor( name='GetFunctionStatus', @@ -534,9 +534,27 @@ _INSTANCECONTROL = _descriptor.ServiceDescriptor( options=None, ), _descriptor.MethodDescriptor( + name='ResetMetrics', + full_name='proto.InstanceControl.ResetMetrics', + index=2, + containing_service=None, + input_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + output_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + options=None, + ), + _descriptor.MethodDescriptor( + name='GetMetrics', + full_name='proto.InstanceControl.GetMetrics', + index=3, + containing_service=None, + input_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + output_type=_METRICSDATA, + options=None, + ), + _descriptor.MethodDescriptor( name='HealthCheck', full_name='proto.InstanceControl.HealthCheck', - index=2, + index=4, containing_service=None, input_type=google_dot_protobuf_dot_empty__pb2._EMPTY, output_type=_HEALTHCHECKRESULT, diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py index 575f994..21730e1 100644 --- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py +++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py @@ -44,6 +44,16 @@ class InstanceControlStub(object): request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, response_deserializer=InstanceCommunication__pb2.MetricsData.FromString, ) + self.ResetMetrics = channel.unary_unary( + '/proto.InstanceControl/ResetMetrics', + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + ) + self.GetMetrics = channel.unary_unary( + '/proto.InstanceControl/GetMetrics', + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=InstanceCommunication__pb2.MetricsData.FromString, + ) self.HealthCheck = channel.unary_unary( '/proto.InstanceControl/HealthCheck', request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, @@ -69,6 +79,20 @@ class InstanceControlServicer(object): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def ResetMetrics(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetMetrics(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def HealthCheck(self, request, context): # missing associated documentation comment in .proto file pass @@ -89,6 +113,16 @@ def add_InstanceControlServicer_to_server(servicer, server): request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString, ), + 'ResetMetrics': grpc.unary_unary_rpc_method_handler( + servicer.ResetMetrics, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'GetMetrics': grpc.unary_unary_rpc_method_handler( + servicer.GetMetrics, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString, + ), 'HealthCheck': grpc.unary_unary_rpc_method_handler( servicer.HealthCheck, request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 9f4a34f..f0f9d0f 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -54,6 +54,7 @@ class ContextImpl(pulsar.Context): self.pulsar_client = pulsar_client self.user_code_dir = os.path.dirname(user_code) self.consumers = consumers + self.current_accumulated_metrics = {} self.accumulated_metrics = {} self.publish_producers = {} self.publish_serializers = {} @@ -107,9 +108,9 @@ class ContextImpl(pulsar.Context): return self.user_config def record_metric(self, metric_name, metric_value): - if not metric_name in self.accumulated_metrics: - self.accumulated_metrics[metric_name] = AccumulatedMetricDatum() - self.accumulated_metrics[metric_name].update(metric_value) + if not metric_name in self.current_accumulated_metrics: + self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum() + self.current_accumulated_metrics[metric_name].update(metric_value) def get_output_topic(self): return self.instance_config.function_details.output @@ -143,6 +144,18 @@ class ContextImpl(pulsar.Context): self.consumers[topic].acknowledge(msgid) def get_and_reset_metrics(self): + metrics = self.get_metrics() + # TODO(sanjeev):- Make this thread safe + self.reset_metrics() + return metrics + + def reset_metrics(self): + # TODO: Make it thread safe + self.accumulated_metrics.clear() + self.accumulated_metrics.update(self.current_accumulated_metrics) + self.current_accumulated_metrics.clear() + + def get_metrics(self): metrics = InstanceCommunication_pb2.MetricsData() for metric_name, accumulated_metric in self.accumulated_metrics.items(): m = InstanceCommunication_pb2.MetricsData.DataDigest() @@ -151,6 +164,4 @@ class ContextImpl(pulsar.Context): m.max = accumulated_metric.max m.min = accumulated_metric.min metrics.metrics[metric_name] = m - # TODO(sanjeev):- Make this thread safe - self.accumulated_metrics.clear() - return metrics + return metrics \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 7120249..70b78f0 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -97,6 +97,22 @@ class Stats(object): else: return self.latency / self.nsuccessfullyprocessed + def update(self, object): + self.nprocessed = object.nprocessed + self.nsuccessfullyprocessed = object.nsuccessfullyprocessed + self.nuserexceptions = object.nuserexceptions + self.nsystemexceptions = object.nsystemexceptions + self.nserialization_exceptions = object.nserialization_exceptions + self.latency = object.latency + self.lastinvocationtime = object.lastinvocationtime + self.latestuserexceptions = [] + self.latestsystemexceptions = [] + self.ndeserialization_exceptions.clear() + self.latestuserexceptions.append(object.latestuserexceptions) + self.latestsystemexceptions.append(object.latestsystemexceptions) + self.ndeserialization_exceptions.update(object.ndeserialization_exceptions) + + class PythonInstance(object): def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, user_code, log_topic, pulsar_client): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) @@ -119,6 +135,7 @@ class PythonInstance(object): self.contextimpl = None self.total_stats = Stats() self.current_stats = Stats() + self.stats = Stats() self.last_health_check_ts = time.time() self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None @@ -278,19 +295,30 @@ class PythonInstance(object): def get_and_reset_metrics(self): # First get any user metrics - metrics = self.contextimpl.get_and_reset_metrics() + metrics = self.get_metrics() + self.reset_metrics() + return metrics + + def reset_metrics(self): + self.stats.update(self.current_stats) + self.current_stats.reset() + self.contextimpl.reset_metrics() + + def get_metrics(self): + # First get any user metrics + metrics = self.contextimpl.get_metrics() # Now add system metrics as well - self.add_system_metrics("__total_processed__", self.current_stats.nprocessed, metrics) - self.add_system_metrics("__total_successfully_processed__", self.current_stats.nsuccessfullyprocessed, metrics) - self.add_system_metrics("__total_system_exceptions__", self.current_stats.nsystemexceptions, metrics) - self.add_system_metrics("__total_user_exceptions__", self.current_stats.nuserexceptions, metrics) - for (topic, metric) in self.current_stats.ndeserialization_exceptions.items(): + self.add_system_metrics("__total_processed__", self.stats.nprocessed, metrics) + self.add_system_metrics("__total_successfully_processed__", self.stats.nsuccessfullyprocessed, metrics) + self.add_system_metrics("__total_system_exceptions__", self.stats.nsystemexceptions, metrics) + self.add_system_metrics("__total_user_exceptions__", self.stats.nuserexceptions, metrics) + for (topic, metric) in self.stats.ndeserialization_exceptions.items(): self.add_system_metrics("__total_deserialization_exceptions__" + topic, metric, metrics) - self.add_system_metrics("__total_serialization_exceptions__", self.current_stats.nserialization_exceptions, metrics) - self.add_system_metrics("__avg_latency_ms__", self.current_stats.compute_latency(), metrics) - self.current_stats.reset() + self.add_system_metrics("__total_serialization_exceptions__", self.stats.nserialization_exceptions, metrics) + self.add_system_metrics("__avg_latency_ms__", self.stats.compute_latency(), metrics) return metrics + def add_system_metrics(self, metric_name, value, metrics): metrics.metrics[metric_name].count = value metrics.metrics[metric_name].sum = value diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py index 193e6ab..611a737 100644 --- a/pulsar-functions/instance/src/main/python/server.py +++ b/pulsar-functions/instance/src/main/python/server.py @@ -42,6 +42,15 @@ class InstanceCommunicationServicer(InstanceCommunication_pb2_grpc.InstanceContr Log.info("Came in GetAndResetMetrics") return self.pyinstance.get_and_reset_metrics() + def ResetMetrics(self, request, context): + Log.info("Came in ResetMetrics") + self.pyinstance.reset_metrics() + return request + + def GetMetrics(self, request, context): + Log.info("Came in GetMetrics") + return self.pyinstance.get_metrics() + def HealthCheck(self, request, context): return self.pyinstance.health_check() diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto index 8db742b..ed8c95a 100644 --- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto +++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto @@ -72,5 +72,7 @@ message HealthCheckResult { service InstanceControl { rpc GetFunctionStatus(google.protobuf.Empty) returns (FunctionStatus) {} rpc GetAndResetMetrics(google.protobuf.Empty) returns (MetricsData) {} + rpc ResetMetrics(google.protobuf.Empty) returns (google.protobuf.Empty) {} + rpc GetMetrics(google.protobuf.Empty) returns (MetricsData) {} rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResult) {} } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 5a63ec3..0c10fdb 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -349,6 +349,36 @@ public class JavaInstanceMain implements AutoCloseable { } @Override + public void getMetrics(com.google.protobuf.Empty request, + io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) { + Runtime runtime = runtimeSpawner.getRuntime(); + if (runtime != null) { + try { + InstanceCommunication.MetricsData metrics = runtime.getMetrics().get(); + responseObserver.onNext(metrics); + responseObserver.onCompleted(); + } catch (InterruptedException | ExecutionException e) { + log.error("Exception in JavaInstance doing getAndResetMetrics", e); + throw new RuntimeException(e); + } + } + } + + public void resetMetrics(com.google.protobuf.Empty request, + io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) { + Runtime runtime = runtimeSpawner.getRuntime(); + if (runtime != null) { + try { + runtime.resetMetrics().get(); + responseObserver.onCompleted(); + } catch (InterruptedException | ExecutionException e) { + log.error("Exception in JavaInstance doing getAndResetMetrics", e); + throw new RuntimeException(e); + } + } + } + + @Override public void healthCheck(com.google.protobuf.Empty request, io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult> responseObserver) { log.debug("Recieved health check request..."); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index bd4e3ae..95d7a53 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -347,6 +347,50 @@ class ProcessRuntime implements Runtime { return retval; } + @Override + public CompletableFuture<Void> resetMetrics() { + CompletableFuture<Void> retval = new CompletableFuture<>(); + if (stub == null) { + retval.completeExceptionally(new RuntimeException("Not alive")); + return retval; + } + ListenableFuture<Empty> response = stub.resetMetrics(Empty.newBuilder().build()); + Futures.addCallback(response, new FutureCallback<Empty>() { + @Override + public void onFailure(Throwable throwable) { + retval.completeExceptionally(throwable); + } + + @Override + public void onSuccess(Empty t) { + retval.complete(null); + } + }); + return retval; + } + + @Override + public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() { + CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>(); + if (stub == null) { + retval.completeExceptionally(new RuntimeException("Not alive")); + return retval; + } + ListenableFuture<InstanceCommunication.MetricsData> response = stub.getMetrics(Empty.newBuilder().build()); + Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() { + @Override + public void onFailure(Throwable throwable) { + retval.completeExceptionally(throwable); + } + + @Override + public void onSuccess(InstanceCommunication.MetricsData t) { + retval.complete(t); + } + }); + return retval; + } + public CompletableFuture<InstanceCommunication.HealthCheckResult> healthCheck() { CompletableFuture<InstanceCommunication.HealthCheckResult> retval = new CompletableFuture<>(); if (stub == null) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java index 3c71800..ea99290 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java @@ -41,5 +41,9 @@ public interface Runtime { CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(); CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics(); + + CompletableFuture<Void> resetMetrics(); + + CompletableFuture<InstanceCommunication.MetricsData> getMetrics(); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 9d9654d..3b53fb8 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -113,6 +113,18 @@ class ThreadRuntime implements Runtime { public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() { return CompletableFuture.completedFuture(javaInstanceRunnable.getAndResetMetrics()); } + + + @Override + public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() { + return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics()); + } + + @Override + public CompletableFuture<Void> resetMetrics() { + javaInstanceRunnable.resetMetrics(); + return CompletableFuture.completedFuture(null); + } @Override public boolean isAlive() { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 08de636..eb5afb9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -31,6 +31,7 @@ import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; +import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; @@ -42,6 +43,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; @@ -438,6 +440,22 @@ public class FunctionRuntimeManager implements AutoCloseable{ public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() { return this.functionRuntimeInfoMap; } + + public void updateRates() { + for (Entry<String, FunctionRuntimeInfo> entry : this.functionRuntimeInfoMap.entrySet()) { + RuntimeSpawner functionRuntimeSpawner = entry.getValue().getRuntimeSpawner(); + if (functionRuntimeSpawner != null) { + Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); + if (functionRuntime != null) { + try { + functionRuntime.resetMetrics().get(); + } catch (Exception e) { + log.error("Failed to update stats for {}-{}", entry.getKey(), e.getMessage()); + } + } + } + } + } /** * Private methods for internal use. Should not be used outside of this class */ diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java index 0a829cb..745f5d7 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -49,7 +49,9 @@ public class FunctionsStatsGenerator { Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); if (functionRuntime != null) { try { - InstanceCommunication.MetricsData metrics = functionRuntime.getAndResetMetrics().get(); + InstanceCommunication.MetricsData metrics = workerService.getWorkerConfig() + .getMetricsSamplingPeriodSec() > 0 ? functionRuntime.getMetrics().get() + : functionRuntime.getAndResetMetrics().get(); for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest> metricsEntry : metrics.getMetricsMap().entrySet()) { String metricName = metricsEntry.getKey(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index c7ebcf9..9d5dfaa 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -73,7 +73,8 @@ public class WorkerConfig implements Serializable { private String tlsTrustCertsFilePath = ""; private boolean tlsAllowInsecureConnection = false; private boolean tlsHostnameVerificationEnable = false; - + private int metricsSamplingPeriodSec = 60; + @Data @Setter @Getter diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 4393331..29dff53 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -20,7 +20,14 @@ package org.apache.pulsar.functions.worker; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; + +import io.netty.util.concurrent.DefaultThreadFactory; + import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -49,11 +56,14 @@ public class WorkerService { private MembershipManager membershipManager; private SchedulerManager schedulerManager; private boolean isInitialized = false; + private final ScheduledExecutorService statsUpdater; private ConnectorsManager connectorsManager; public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; + this.statsUpdater = Executors + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater")); } public void start(URI dlogUri) throws InterruptedException { @@ -138,6 +148,14 @@ public class WorkerService { // indicate function worker service is done intializing this.isInitialized = true; + this.connectorsManager = new ConnectorsManager(workerConfig); + + int metricsSamplingPeriodSec = this.workerConfig.getMetricsSamplingPeriodSec(); + if (metricsSamplingPeriodSec > 0) { + this.statsUpdater.scheduleAtFixedRate(() -> this.functionRuntimeManager.updateRates(), + metricsSamplingPeriodSec, metricsSamplingPeriodSec, TimeUnit.SECONDS); + } + } catch (Throwable t) { log.error("Error Starting up in worker", t); throw new RuntimeException(t); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java index 7bd85e8..d320514 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java @@ -52,6 +52,7 @@ public class FunctionStatsGeneratorTest { WorkerService workerService = mock(WorkerService.class); doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager(); + doReturn(new WorkerConfig()).when(workerService).getWorkerConfig(); CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>(); InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder() @@ -66,7 +67,7 @@ public class FunctionStatsGeneratorTest { metricsDataCompletableFuture.complete(metricsData); Runtime runtime = mock(Runtime.class); - doReturn(metricsDataCompletableFuture).when(runtime).getAndResetMetrics(); + doReturn(metricsDataCompletableFuture).when(runtime).getMetrics(); RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class); doReturn(runtime).when(runtimeSpawner).getRuntime();