This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 130df3c  fix healthmgr metrics (#2839)
130df3c is described below

commit 130df3c17e26126f1e7003ed069f8fc6e64a3d3f
Author: Huijun Wu <huij...@twitter.com>
AuthorDate: Thu Apr 12 20:50:18 2018 -0700

    fix healthmgr metrics (#2839)
    
    * fix healthmgr metrics
    
    * fix exception null
    
    * fix metrics flush
    
    * fill metricsmgr client
    
    * add debug print
    
    * add next timer
    
    * fix metrics
    
    * fix style
    
    * fix unit
    
    * remove unnecessary comment
    
    * fix typo
    
    * update style
    
    * update style 2
    
    * update style3
    
    * make systemconfigfile optional
    
    * staging
    
    * fix compile
    
    * replace singleton with guice singleton
    
    * update sensor with guice singleton
    
    * fixcompile
    
    * rename metricsPublisher
---
 heron/executor/src/python/heron_executor.py        |   3 +-
 .../tests/python/heron_executor_unittest.py        |   2 +-
 .../org/apache/heron/healthmgr/HealthManager.java  |  49 +++++--
 .../heron/healthmgr/HealthManagerMetrics.java      | 153 ++++++++++++++++-----
 .../healthmgr/detectors/BackPressureDetector.java  |   9 +-
 .../diagnosers/SlowInstanceDiagnoser.java          |  14 ++
 .../resolvers/RestartContainerResolver.java        |  10 +-
 .../healthmgr/sensors/BackPressureSensor.java      |  10 +-
 .../apache/heron/healthmgr/HealthManagerTest.java  |   2 +-
 9 files changed, 196 insertions(+), 56 deletions(-)

diff --git a/heron/executor/src/python/heron_executor.py 
b/heron/executor/src/python/heron_executor.py
index 7e64a33..cbacf4b 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -498,7 +498,8 @@ class HeronExecutor(object):
                      "--cluster", self.cluster,
                      "--role", self.role,
                      "--environment", self.environment,
-                     "--topology_name", self.topology_name]
+                     "--topology_name", self.topology_name,
+                     "--metricsmgr_port", self.metrics_manager_port]
 
     return healthmgr_cmd
 
diff --git a/heron/executor/tests/python/heron_executor_unittest.py 
b/heron/executor/tests/python/heron_executor_unittest.py
index 76caff1..05d7e3b 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -131,7 +131,7 @@ class HeronExecutorTest(unittest.TestCase):
              "-Xloggc:log-files/gc.healthmgr.log 
-Djava.net.preferIPv4Stack=true " \
              "-cp scheduler_classpath:healthmgr_classpath " \
              "org.apache.heron.healthmgr.HealthManager --cluster cluster 
--role role " \
-             "--environment environ --topology_name topname"
+             "--environment environ --topology_name topname --metricsmgr_port 
metricsmgr_port"
 
   def get_expected_instance_command(component_name, instance_id, container_id):
     instance_name = "container_%d_%s_%d" % (container_id, component_name, 
instance_id)
diff --git 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManager.java 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManager.java
index 684d260..3e9c3fc 100644
--- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManager.java
+++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManager.java
@@ -18,6 +18,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
@@ -44,6 +45,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.heron.classification.InterfaceStability.Evolving;
 import org.apache.heron.classification.InterfaceStability.Unstable;
+import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.config.SystemConfig;
 import org.apache.heron.common.utils.logging.LoggingHelper;
 import org.apache.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey;
@@ -194,6 +196,14 @@ public class HealthManager {
 
     setupLogging(cmd, config);
 
+    LOG.fine(Arrays.toString(cmd.getOptions()));
+
+    // Add the SystemConfig into SingletonRegistry
+    SystemConfig systemConfig = SystemConfig.newBuilder(true)
+        .putAll(Context.systemFile(config), true)
+        .putAll(Context.overrideFile(config), true).build();
+    
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, 
systemConfig);
+
     LOG.info("Static Heron config loaded successfully ");
     LOG.fine(config.toString());
 
@@ -204,32 +214,29 @@ public class HealthManager {
     String metricsUrl = 
config.getStringValue(PolicyConfigKey.METRIC_SOURCE_URL.key());
     metricsUrl = getOptionValue(cmd, CliArgs.METRIC_SOURCE_URL, metricsUrl);
 
-    AbstractModule module = buildMetricsProviderModule(metricsUrl, 
metricSourceClassName);
+    // metrics reporting thread
+    HealthManagerMetrics publishingMetrics =
+        new HealthManagerMetrics(Integer.valueOf(getOptionValue(cmd, 
CliArgs.METRICSMGR_PORT)));
+
+    AbstractModule module
+        = buildBaseModule(metricsUrl, metricSourceClassName, 
publishingMetrics);
     HealthManager healthManager = new HealthManager(config, module);
 
     LOG.info("Initializing health manager");
     healthManager.initialize();
 
-    LOG.info("Starting Health Manager metirc posting thread");
-    HealthManagerMetrics publishingMetricsRunnable = null;
-    if (hasOption(cmd, CliArgs.METRICSMGR_PORT)) {
-      publishingMetricsRunnable = new HealthManagerMetrics(
-          Integer.valueOf(getOptionValue(cmd, CliArgs.METRICSMGR_PORT)));
-    }
-
     LOG.info("Starting Health Manager");
     PoliciesExecutor policyExecutor = new 
PoliciesExecutor(healthManager.healthPolicies);
     ScheduledFuture<?> future = policyExecutor.start();
-    if (publishingMetricsRunnable != null) {
-      new Thread(publishingMetricsRunnable).start();
-    }
+
+    LOG.info("Starting Health Manager metric posting thread");
+    new Thread(publishingMetrics).start();
+
     try {
       future.get();
     } finally {
       policyExecutor.destroy();
-      if (publishingMetricsRunnable != null) {
-        publishingMetricsRunnable.close();
-      }
+      publishingMetrics.close();
     }
   }
 
@@ -319,7 +326,8 @@ public class HealthManager {
   }
 
   @VisibleForTesting
-  static AbstractModule buildMetricsProviderModule(final String sourceUrl, 
final String type) {
+  static AbstractModule buildBaseModule(final String sourceUrl, final String 
type,
+                                        final HealthManagerMetrics 
publishingMetrics) {
     return new AbstractModule() {
       @Override
       protected void configure() {
@@ -329,6 +337,8 @@ public class HealthManager {
         bind(String.class)
             .annotatedWith(Names.named(CONF_METRICS_SOURCE_TYPE))
             .toInstance(type);
+        bind(HealthManagerMetrics.class)
+            .toInstance(publishingMetrics);
       }
     };
   }
@@ -501,6 +511,14 @@ public class HealthManager {
         .argName("process mode")
         .build();
 
+    Option metricsMgrPort = Option.builder("m")
+        .desc("Port of local MetricsManager")
+        .longOpt(CliArgs.METRICSMGR_PORT.text)
+        .hasArgs()
+        .argName("metrics_manager port")
+        .required()
+        .build();
+
     Option verbose = Option.builder("v")
         .desc("Enable debug logs")
         .longOpt(CliArgs.VERBOSE.text)
@@ -515,6 +533,7 @@ public class HealthManager {
     options.addOption(metricsSourceType);
     options.addOption(metricsSourceURL);
     options.addOption(mode);
+    options.addOption(metricsMgrPort);
     options.addOption(verbose);
 
     return options;
diff --git 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManagerMetrics.java 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManagerMetrics.java
index 478ed1f..1989fec45 100644
--- 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManagerMetrics.java
+++ 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManagerMetrics.java
@@ -15,13 +15,19 @@
 package org.apache.heron.healthmgr;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.time.Duration;
+import java.util.Map.Entry;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
 import com.google.protobuf.Message;
 
 import org.apache.heron.api.metric.MultiCountMetric;
-import org.apache.heron.common.basics.Communicator;
 import org.apache.heron.common.basics.NIOLooper;
 import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.config.SystemConfig;
@@ -29,31 +35,40 @@ import org.apache.heron.common.network.HeronClient;
 import org.apache.heron.common.network.HeronSocketOptions;
 import org.apache.heron.common.network.StatusCode;
 import org.apache.heron.common.utils.metrics.JVMMetrics;
+import org.apache.heron.proto.system.Common;
 import org.apache.heron.proto.system.Metrics;
 
 /**
  * HealthMgr's metrics to be collect
  */
-
+@Singleton
 public class HealthManagerMetrics implements Runnable, AutoCloseable {
+  public static final String METRICS_THREAD = "HealthManagerMetrics";
   private static final Logger LOG = 
Logger.getLogger(HealthManagerMetrics.class.getName());
   private static final String METRICS_MGR_HOST = "127.0.0.1";
 
+  private final String metricsPrefix = "__healthmgr/";
+  private final String metricsSensor = metricsPrefix + "sensor/";
+  private final String metricsDetector = metricsPrefix + "detector/";
+  private final String metricsDiagnoser = metricsPrefix + "diagnoser/";
+  private final String metricsResolver = metricsPrefix + "resolver/";
+  private final String metricsName = metricsPrefix + "customized/";
   private final JVMMetrics jvmMetrics;
   private final MultiCountMetric executeSensorCount;
   private final MultiCountMetric executeDetectorCount;
   private final MultiCountMetric executeDiagnoserCount;
   private final MultiCountMetric executeResolverCount;
+  private final MultiCountMetric executeCount;
 
   private NIOLooper looper;
   private HeronClient metricsMgrClient;
-  private Communicator<Metrics.MetricPublisherPublishMessage> outMetricsQueues;
 
   /**
    * constructor to expose healthmgr metrics to local metricsmgr
    * @param metricsMgrPort local MetricsMgr port
    * @throws IOException
    */
+  @Inject
   public HealthManagerMetrics(int metricsMgrPort) throws IOException {
     jvmMetrics = new JVMMetrics();
 
@@ -61,6 +76,7 @@ public class HealthManagerMetrics implements Runnable, 
AutoCloseable {
     executeDetectorCount = new MultiCountMetric();
     executeDiagnoserCount = new MultiCountMetric();
     executeResolverCount = new MultiCountMetric();
+    executeCount = new MultiCountMetric();
 
     looper = new NIOLooper();
 
@@ -76,38 +92,58 @@ public class HealthManagerMetrics implements Runnable, 
AutoCloseable {
             systemConfig.getInstanceNetworkOptionsSocketReceivedBufferSize(),
             systemConfig.getInstanceNetworkOptionsMaximumPacketSize());
     metricsMgrClient =
-        new MetricsMgrClient(looper, METRICS_MGR_HOST, metricsMgrPort, 
socketOptions);
-
-    outMetricsQueues = new 
Communicator<Metrics.MetricPublisherPublishMessage>(null, looper);
-    
outMetricsQueues.init(systemConfig.getInstanceInternalMetricsWriteQueueCapacity(),
-        systemConfig.getInstanceTuningExpectedMetricsWriteQueueSize(),
-        systemConfig.getInstanceTuningCurrentSampleWeight());
+        new SimpleMetricsManagerClient(looper, METRICS_MGR_HOST, 
metricsMgrPort, socketOptions);
 
     int interval = (int) 
systemConfig.getHeronMetricsExportInterval().getSeconds();
 
+
     looper.registerTimerEvent(Duration.ofSeconds(interval), new Runnable() {
       @Override
       public void run() {
-        jvmMetrics.getJVMSampleRunnable().run();
-
-        // push to container 0 metricsMgr
-        if (!metricsMgrClient.isConnected()) {
-          return;
-        }
-
-        LOG.info("Flushing all pending data in MetricsManagerClient");
-        // Collect all tuples in queue
-        int size = outMetricsQueues.size();
-        for (int i = 0; i < size; i++) {
-          Metrics.MetricPublisherPublishMessage m = outMetricsQueues.poll();
-          metricsMgrClient.sendMessage(m);
+        sendMetrics();
+        // next timer task
+        if (looper != null) {
+          looper.registerTimerEvent(Duration.ofSeconds(interval), new 
Runnable() {
+            @Override
+            public void run() {
+              sendMetrics();
+            }
+          });
         }
       }
     });
   }
 
-  public Communicator<Metrics.MetricPublisherPublishMessage> getMetricsQueue() 
{
-    return outMetricsQueues;
+  private void sendMetrics() {
+    jvmMetrics.getJVMSampleRunnable().run();
+
+    if (!metricsMgrClient.isConnected()) {
+      return;
+    }
+
+    LOG.info("Flushing sensor/detector/diagnoser/resolver metrics");
+    Metrics.MetricPublisherPublishMessage.Builder builder =
+        Metrics.MetricPublisherPublishMessage.newBuilder();
+    addMetrics(builder, executeSensorCount, metricsSensor);
+    addMetrics(builder, executeDetectorCount, metricsDetector);
+    addMetrics(builder, executeDiagnoserCount, metricsDiagnoser);
+    addMetrics(builder, executeResolverCount, metricsResolver);
+    addMetrics(builder, executeCount, metricsName);
+    Metrics.MetricPublisherPublishMessage msg = builder.build();
+    LOG.fine(msg.toString());
+    metricsMgrClient.sendMessage(msg);
+  }
+
+  private void addMetrics(Metrics.MetricPublisherPublishMessage.Builder b, 
MultiCountMetric m,
+      String prefix) {
+    for (Entry<String, Long> e : m.getValueAndReset().entrySet()) {
+      b.addMetrics(Metrics.MetricDatum.newBuilder().setName(prefix + 
e.getKey())
+          .setValue(e.getValue().toString()));
+    }
+  }
+
+  public synchronized void executeIncr(String metricName) {
+    executeCount.scope(metricName).incr();
   }
 
   public synchronized void executeSensorIncr(String sensor) {
@@ -136,44 +172,91 @@ public class HealthManagerMetrics implements Runnable, 
AutoCloseable {
   public void close() throws Exception {
     looper.exitLoop();
     metricsMgrClient.stop();
-    outMetricsQueues.clear();
   }
 
-  class MetricsMgrClient extends HeronClient {
+  class SimpleMetricsManagerClient extends HeronClient {
+    private SystemConfig systemConfig;
+    private String hostname;
 
-    MetricsMgrClient(NIOLooper s, String host, int port, HeronSocketOptions 
options) {
+    SimpleMetricsManagerClient(NIOLooper s, String host, int port, 
HeronSocketOptions options) {
       super(s, host, port, options);
-      // TODO Auto-generated constructor stub
+      systemConfig =
+          (SystemConfig) 
SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
+      try {
+        this.hostname = InetAddress.getLocalHost().getHostName();
+      } catch (UnknownHostException e) {
+        throw new RuntimeException("GetHostName failed");
+      }
     }
 
     @Override
     public void onError() {
-      // TODO Auto-generated method stub
+      LOG.severe("Disconnected from Metrics Manager.");
 
+      // Dispatch to onConnect(...)
+      onConnect(StatusCode.CONNECT_ERROR);
     }
 
     @Override
     public void onConnect(StatusCode status) {
-      // TODO Auto-generated method stub
+      if (status != StatusCode.OK) {
+        LOG.log(Level.WARNING,
+            "Cannot connect to the local metrics mgr with status: {0}, Will 
Retry..", status);
+        Runnable r = new Runnable() {
+          public void run() {
+            start();
+          }
+        };
+
+        
getNIOLooper().registerTimerEvent(systemConfig.getInstanceReconnectMetricsmgrInterval(),
 r);
+        return;
+      }
 
+      LOG.info("Connected to Metrics Manager. Ready to send register request");
+      sendRegisterRequest();
+    }
+
+    private void sendRegisterRequest() {
+      Metrics.MetricPublisher publisher = 
Metrics.MetricPublisher.newBuilder().setHostname(hostname)
+          
.setPort(getSocketChannel().socket().getPort()).setComponentName("__healthmgr__")
+          .setInstanceId("healthmgr-0").setInstanceIndex(-1).build();
+      Metrics.MetricPublisherRegisterRequest request =
+          
Metrics.MetricPublisherRegisterRequest.newBuilder().setPublisher(publisher).build();
+
+      // The timeout would be the reconnect-interval-seconds
+      sendRequest(request, null, 
Metrics.MetricPublisherRegisterResponse.newBuilder(),
+          systemConfig.getInstanceReconnectMetricsmgrInterval());
     }
 
     @Override
     public void onResponse(StatusCode status, Object ctx, Message response) {
-      // TODO Auto-generated method stub
+      if (status != StatusCode.OK) {
+        throw new RuntimeException("Response from Metrics Manager not ok");
+      }
+      if (Metrics.MetricPublisherRegisterResponse.class.isInstance(response)) {
+        handleRegisterResponse((Metrics.MetricPublisherRegisterResponse) 
response);
+      } else {
+        throw new RuntimeException("Unknown kind of response received from 
Metrics Manager");
+      }
+    }
+
+    private void 
handleRegisterResponse(Metrics.MetricPublisherRegisterResponse response) {
+      if (response.getStatus().getStatus() != Common.StatusCode.OK) {
+        throw new RuntimeException("Metrics Manager returned a not ok response 
for register");
+      }
 
+      LOG.info("We registered ourselves to the Metrics Manager");
     }
 
     @Override
     public void onIncomingMessage(Message message) {
-      // TODO Auto-generated method stub
-
+      throw new RuntimeException(
+          "SimpleMetricsManagerClient got an unknown message from Metrics 
Manager");
     }
 
     @Override
     public void onClose() {
-      // TODO Auto-generated method stub
-
+      LOG.info("SimpleMetricsManagerClient exits");
     }
 
   }
diff --git 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/detectors/BackPressureDetector.java
 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/detectors/BackPressureDetector.java
index 5fe0824..a0edb7c 100644
--- 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/detectors/BackPressureDetector.java
+++ 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/detectors/BackPressureDetector.java
@@ -28,6 +28,7 @@ import com.microsoft.dhalion.core.Measurement;
 import com.microsoft.dhalion.core.MeasurementsTable;
 import com.microsoft.dhalion.core.Symptom;
 
+import org.apache.heron.healthmgr.HealthManagerMetrics;
 import org.apache.heron.healthmgr.HealthPolicyConfig;
 
 import static 
org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
@@ -35,14 +36,18 @@ import static 
org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMP
 import static 
org.apache.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 
 public class BackPressureDetector extends BaseDetector {
+  public static final String BACK_PRESSURE_DETECTOR = "BackPressureDetector";
   static final String CONF_NOISE_FILTER = 
"BackPressureDetector.noiseFilterMillis";
 
   private static final Logger LOG = 
Logger.getLogger(BackPressureDetector.class.getName());
   private final int noiseFilterMillis;
+  private HealthManagerMetrics publishingMetrics;
 
   @Inject
-  BackPressureDetector(HealthPolicyConfig policyConfig) {
+  BackPressureDetector(HealthPolicyConfig policyConfig,
+                       HealthManagerMetrics publishingMetrics) {
     noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20);
+    this.publishingMetrics = publishingMetrics;
   }
 
   /**
@@ -53,6 +58,8 @@ public class BackPressureDetector extends BaseDetector {
    */
   @Override
   public Collection<Symptom> detect(Collection<Measurement> measurements) {
+    publishingMetrics.executeDetectorIncr(BACK_PRESSURE_DETECTOR);
+
     Collection<Symptom> result = new ArrayList<>();
     Instant now = context.checkpoint();
 
diff --git 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
index 08a47fc..1b2fdf6 100644
--- 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
+++ 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
@@ -19,11 +19,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.logging.Logger;
 
+import javax.inject.Inject;
+
 import com.microsoft.dhalion.core.Diagnosis;
 import com.microsoft.dhalion.core.MeasurementsTable;
 import com.microsoft.dhalion.core.Symptom;
 import com.microsoft.dhalion.core.SymptomsTable;
 
+import org.apache.heron.healthmgr.HealthManagerMetrics;
+
 import static 
org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
 import static 
org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
 import static 
org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
@@ -31,10 +35,20 @@ import static 
org.apache.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.
 import static 
org.apache.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 public class SlowInstanceDiagnoser extends BaseDiagnoser {
+  public static final String SLOW_INSTANCE_DIAGNOSER = "SlowInstanceDiagnoser";
   private static final Logger LOG = 
Logger.getLogger(SlowInstanceDiagnoser.class.getName());
 
+  private HealthManagerMetrics publishingMetrics;
+
+  @Inject
+  public SlowInstanceDiagnoser(HealthManagerMetrics publishingMetrics) {
+    this.publishingMetrics = publishingMetrics;
+  }
+
   @Override
   public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+    publishingMetrics.executeDiagnoserIncr(SLOW_INSTANCE_DIAGNOSER);
+
     Collection<Diagnosis> diagnoses = new ArrayList<>();
     SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
 
diff --git 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/resolvers/RestartContainerResolver.java
 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/resolvers/RestartContainerResolver.java
index 759e803..bf9b3cb 100644
--- 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/resolvers/RestartContainerResolver.java
+++ 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/resolvers/RestartContainerResolver.java
@@ -30,6 +30,7 @@ import com.microsoft.dhalion.core.SymptomsTable;
 import com.microsoft.dhalion.events.EventManager;
 import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
+import org.apache.heron.healthmgr.HealthManagerMetrics;
 import org.apache.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
 import org.apache.heron.proto.scheduler.Scheduler.RestartTopologyRequest;
 import org.apache.heron.scheduler.client.ISchedulerClient;
@@ -38,20 +39,24 @@ import static 
org.apache.heron.healthmgr.HealthManager.CONF_TOPOLOGY_NAME;
 import static 
org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
 
 public class RestartContainerResolver implements IResolver {
+  public static final String RESTART_CONTAINER_RESOLVER = 
"RestartContainerResolver";
   private static final Logger LOG = 
Logger.getLogger(RestartContainerResolver.class.getName());
 
   private final EventManager eventManager;
   private final String topologyName;
   private final ISchedulerClient schedulerClient;
   private ExecutionContext context;
+  private HealthManagerMetrics publishingMetrics;
 
   @Inject
   public RestartContainerResolver(@Named(CONF_TOPOLOGY_NAME) String 
topologyName,
                                   EventManager eventManager,
-                                  ISchedulerClient schedulerClient) {
+                                  ISchedulerClient schedulerClient,
+                                  HealthManagerMetrics publishingMetrics) {
     this.topologyName = topologyName;
     this.eventManager = eventManager;
     this.schedulerClient = schedulerClient;
+    this.publishingMetrics = publishingMetrics;
   }
 
   @Override
@@ -61,6 +66,8 @@ public class RestartContainerResolver implements IResolver {
 
   @Override
   public Collection<Action> resolve(Collection<Diagnosis> diagnosis) {
+    publishingMetrics.executeResolver(RESTART_CONTAINER_RESOLVER);
+
     List<Action> actions = new ArrayList<>();
 
     // find all back pressure measurements reported in this execution cycle
@@ -97,6 +104,7 @@ public class RestartContainerResolver implements IResolver {
               .setTopologyName(topologyName)
               .build());
       LOG.info("Restarted container result: " + b);
+      publishingMetrics.executeIncr("RestartContainer");
     });
 
     LOG.info("Broadcasting container restart event");
diff --git 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/sensors/BackPressureSensor.java
 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/sensors/BackPressureSensor.java
index 8cbca99..7bc7ea6 100644
--- 
a/heron/healthmgr/src/java/org/apache/heron/healthmgr/sensors/BackPressureSensor.java
+++ 
b/heron/healthmgr/src/java/org/apache/heron/healthmgr/sensors/BackPressureSensor.java
@@ -26,6 +26,7 @@ import com.microsoft.dhalion.api.MetricsProvider;
 import com.microsoft.dhalion.core.Measurement;
 import com.microsoft.dhalion.core.MeasurementsTable;
 
+import org.apache.heron.healthmgr.HealthManagerMetrics;
 import org.apache.heron.healthmgr.HealthPolicyConfig;
 import org.apache.heron.healthmgr.common.PackingPlanProvider;
 import org.apache.heron.healthmgr.common.TopologyProvider;
@@ -33,19 +34,24 @@ import org.apache.heron.healthmgr.common.TopologyProvider;
 import static 
org.apache.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 
 public class BackPressureSensor extends BaseSensor {
+  public static final String BACKPRESSURE_SENSOR = "BackPressureSensor";
+
   private final MetricsProvider metricsProvider;
   private final PackingPlanProvider packingPlanProvider;
   private final TopologyProvider topologyProvider;
+  private HealthManagerMetrics publishingMetrics;
 
   @Inject
   public BackPressureSensor(PackingPlanProvider packingPlanProvider,
                             TopologyProvider topologyProvider,
                             HealthPolicyConfig policyConfig,
-                            MetricsProvider metricsProvider) {
+                            MetricsProvider metricsProvider,
+                            HealthManagerMetrics publishingMetrics) {
     super(policyConfig, METRIC_BACK_PRESSURE.text(), 
BackPressureSensor.class.getSimpleName());
     this.packingPlanProvider = packingPlanProvider;
     this.topologyProvider = topologyProvider;
     this.metricsProvider = metricsProvider;
+    this.publishingMetrics = publishingMetrics;
   }
 
   /**
@@ -55,6 +61,8 @@ public class BackPressureSensor extends BaseSensor {
    */
   @Override
   public Collection<Measurement> fetch() {
+    publishingMetrics.executeSensorIncr(BACKPRESSURE_SENSOR);
+
     Collection<Measurement> result = new ArrayList<>();
     Instant now = context.checkpoint();
 
diff --git 
a/heron/healthmgr/tests/java/org/apache/heron/healthmgr/HealthManagerTest.java 
b/heron/healthmgr/tests/java/org/apache/heron/healthmgr/HealthManagerTest.java
index 2fa1e7e..3711e6c 100644
--- 
a/heron/healthmgr/tests/java/org/apache/heron/healthmgr/HealthManagerTest.java
+++ 
b/heron/healthmgr/tests/java/org/apache/heron/healthmgr/HealthManagerTest.java
@@ -64,7 +64,7 @@ public class HealthManagerTest {
     
when(adaptor.getSchedulerLocation(anyString())).thenReturn(schedulerLocation);
 
     AbstractModule baseModule = HealthManager
-        .buildMetricsProviderModule("127.0.0.1", 
TrackerMetricsProvider.class.getName());
+        .buildBaseModule("127.0.0.1", TrackerMetricsProvider.class.getName());
 
     HealthManager healthManager = new HealthManager(config, baseModule);
 

-- 
To stop receiving notification emails like this one, please contact
hui...@apache.org.

Reply via email to