[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version

2018-04-06 Thread GitBox
huijunwu commented on a change in pull request #2821: Update Dhalion dependency 
version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r179871817
 
 

 ##
 File path: 
heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
 ##
 @@ -15,53 +15,100 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
+import java.time.Instant;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
 import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
 public class SkewDetector extends BaseDetector {
-  private static final Logger LOG = 
Logger.getLogger(SkewDetector.class.getName());
-  private final BaseSensor sensor;
   private final double skewRatio;
-  private final BaseDetector.SymptomName symptomName;
+  private final String metricName;
+  private final BaseDetector.SymptomType symptomType;
 
   @Inject
-  SkewDetector(BaseSensor sensor, double skewRatio, BaseDetector.SymptomName 
symptom) {
-this.sensor = sensor;
+  SkewDetector(double skewRatio, BaseSensor.MetricName metricName, 
BaseDetector.SymptomType
+  symptomType) {
 this.skewRatio = skewRatio;
-this.symptomName = symptom;
+this.metricName = metricName.text();
+this.symptomType = symptomType;
   }
 
   /**
-   * Detects components experiencing data skew, instances with vastly 
different execute counts.
+   * Detects components experiencing skew on a specific metric
*
-   * @return A collection of affected components
+   * @return At most two symptoms corresponding to each affected component -- 
one for positive skew
+   * and one for negative skew
*/
   @Override
-  public List detect() {
-ArrayList result = new ArrayList<>();
-
-Map metrics = sensor.get();
-for (ComponentMetrics compMetrics : metrics.values()) {
-  ComponentMetricsHelper compStats = new 
ComponentMetricsHelper(compMetrics);
-  MetricsStats stats = 
compStats.computeMinMaxStats(sensor.getMetricName());
-  if (stats.getMetricMax() > skewRatio * stats.getMetricMin()) {
-LOG.info(String.format("Detected skew for %s, min = %f, max = %f",
-compMetrics.getName(), stats.getMetricMin(), 
stats.getMetricMax()));
-result.add(new Symptom(symptomName.text(), compMetrics));
+  public Collection detect(Collection measurements) {
+Collection result = new ArrayList<>();
+
+MeasurementsTable metrics = 
MeasurementsTable.of(measurements).type(metricName);
+Instant now = context.checkpoint();
+for (String component : metrics.uniqueComponents()) {
+  Set addresses = new HashSet<>();
+  Set positiveAddresses = new HashSet<>();
+  Set negativeAddresses = new HashSet<>();
+
+  double componentMax = getMaxOfAverage(metrics.component(component));
+  double componentMin = getMinOfAverage(metrics.component(component));
+  if (componentMax > skewRatio * componentMin) {
+//there is skew
+addresses.add(component);
+result.add(new Symptom(symptomType.text(), now, addresses));
+
+for (String instance : metrics.component(component).uniqueInstances()) 
{
+  if (metrics.instance(instance).mean() >= 0.90 * componentMax) {
+positiveAddresses.add(instance);
+  }
+  if (metrics.instance(instance).mean() <= 1.10 * componentMin) {
+negativeAddresses.add(instance);
+  }
+}
+
+if (!positiveAddresses.isEmpty()) {
+  result.add(new Symptom("POSITIVE " + symptomType.text(), now, 
positiveAddresses));
+}
+if (!negativeAddresses.isEmpty()) {
+  result.add(new Symptom("NEGATIVE " + symptomType.text(), now, 
negativeAddresses));
+}
   }
-}
 
+}
 return result;
   }
+
+  @VisibleForTesting
+  double getMaxOfAverage(MeasurementsTable table) {
 
 Review comment:
   sgtm


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version

2018-04-06 Thread GitBox
huijunwu commented on a change in pull request #2821: Update Dhalion dependency 
version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r179871874
 
 

 ##
 File path: 
heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
 ##
 @@ -16,58 +16,89 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+
+import org.apache.commons.math3.stat.regression.SimpleRegression;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
-import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_GROWING_WAIT_Q;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_GROWING_WAIT_Q;
+import static 
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 
-public class GrowingWaitQueueDetector implements IDetector {
-  static final String CONF_LIMIT = 
GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
+public class GrowingWaitQueueDetector extends BaseDetector {
+  static final String CONF_LIMIT
+  = GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
 
   private static final Logger LOG = 
Logger.getLogger(GrowingWaitQueueDetector.class.getName());
-  private final BufferSizeSensor pendingBufferSensor;
   private final double rateLimit;
 
   @Inject
-  GrowingWaitQueueDetector(BufferSizeSensor pendingBufferSensor,
-   HealthPolicyConfig policyConfig) {
-this.pendingBufferSensor = pendingBufferSensor;
+  GrowingWaitQueueDetector(HealthPolicyConfig policyConfig) {
 rateLimit = (double) policyConfig.getConfig(CONF_LIMIT, 10.0);
   }
 
   /**
* Detects all components unable to keep up with input load, hence having a 
growing pending buffer
* or wait queue
*
-   * @return A collection of all components executing slower than input rate.
+   * @return A collection of symptoms each one corresponding to a components 
executing slower
+   * than input rate.
*/
   @Override
-  public List detect() {
-ArrayList result = new ArrayList<>();
-
-Map bufferSizes = pendingBufferSensor.get();
-for (ComponentMetrics compMetrics : bufferSizes.values()) {
-  ComponentMetricsHelper compStats = new 
ComponentMetricsHelper(compMetrics);
-  compStats.computeBufferSizeTrend();
-  if (compStats.getMaxBufferChangeRate() > rateLimit) {
+  public Collection detect(Collection measurements) {
+
+Collection result = new ArrayList<>();
+
+MeasurementsTable waitQueueMetrics = 
MeasurementsTable.of(measurements).type
+(METRIC_WAIT_Q_SIZE.text());
+for (String component : waitQueueMetrics.uniqueComponents()) {
+  Set addresses = new HashSet<>();
+  double maxSlope = 
computeWaitQueueSizeTrend(waitQueueMetrics.component(component));
+  if (maxSlope > rateLimit) {
 LOG.info(String.format("Detected growing wait queues for %s, max rate 
%f",
-compMetrics.getName(), compStats.getMaxBufferChangeRate()));
-result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), compMetrics));
+component, maxSlope));
+addresses.add(component);
+result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), 
context.checkpoint(), addresses));
   }
 }
-
 return result;
   }
+
+
+  private double computeWaitQueueSizeTrend(MeasurementsTable metrics) {
+double maxSlope = 0;
+for (String instance : metrics.uniqueInstances()) {
+
+  if (metrics.instance(instance) == null || 
metrics.instance(instance).size() < 3) {
+// missing of insufficient data for creating a trend line
+continue;
+  }
+
+  Collection measurements = 
metrics.instance(instance).sort(false,
+  MeasurementsTable.SortKey
+  .TIME_STAMP).get();
+  SimpleRegression simpleRegression = new SimpleRegression(true);
+
+  for (Measurement m : measurements) {
+simpleRegression.addData(m.instant().getEpochSecond(), m.value());
 
 Review comment:
   sgtm


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version

2018-03-30 Thread GitBox
huijunwu commented on a change in pull request #2821: Update Dhalion dependency 
version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178395687
 
 

 ##
 File path: 
heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
 ##
 @@ -15,58 +15,67 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
 
-import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
+import static 
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 
-public class BackPressureDetector implements IDetector {
-  public static final String CONF_NOISE_FILTER = 
"BackPressureDetector.noiseFilterMillis";
+public class BackPressureDetector extends BaseDetector {
+  static final String CONF_NOISE_FILTER = 
"BackPressureDetector.noiseFilterMillis";
 
   private static final Logger LOG = 
Logger.getLogger(BackPressureDetector.class.getName());
-  private final BackPressureSensor bpSensor;
   private final int noiseFilterMillis;
 
   @Inject
-  BackPressureDetector(BackPressureSensor bpSensor,
-   HealthPolicyConfig policyConfig) {
-this.bpSensor = bpSensor;
+  BackPressureDetector(HealthPolicyConfig policyConfig) {
 noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20);
   }
 
   /**
* Detects all components initiating backpressure above the configured 
limit. Normally there
* will be only one component
*
-   * @return A collection of all components causing backpressure.
+   * @return A collection of symptoms each one corresponding to a components 
with backpressure.
*/
   @Override
-  public List detect() {
-ArrayList result = new ArrayList<>();
+  public Collection detect(Collection measurements) {
+Collection result = new ArrayList<>();
+Instant now = context.checkpoint();
 
-Map backpressureMetrics = bpSensor.get();
-for (ComponentMetrics compMetrics : backpressureMetrics.values()) {
-  ComponentMetricsHelper compStats = new 
ComponentMetricsHelper(compMetrics);
-  compStats.computeBpStats();
-  if (compStats.getTotalBackpressure() > noiseFilterMillis) {
-LOG.info(String.format("Detected back pressure for %s, total back 
pressure is %f",
-compMetrics.getName(), compStats.getTotalBackpressure()));
-result.add(new Symptom(SYMPTOM_BACK_PRESSURE.text(), compMetrics));
+MeasurementsTable bpMetrics
+= MeasurementsTable.of(measurements).type(METRIC_BACK_PRESSURE.text());
+for (String component : bpMetrics.uniqueComponents()) {
+  double compBackPressure = bpMetrics.component(component).sum();
+  if (compBackPressure > noiseFilterMillis) {
+LOG.info(String.format("Detected component back-pressure for %s, total 
back pressure is %f",
+component, compBackPressure));
+List addresses = Collections.singletonList(component);
+result.add(new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, 
addresses));
+  }
+}
+for (String instance : bpMetrics.uniqueInstances()) {
 
 Review comment:
   it looks like that component and instance are independent here.
   shall we enforce/verify that the instance should be of the above component?
   
   besides, component and instance are flat in the table/context. is there a 
place to track the component-instance relation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version

2018-03-30 Thread GitBox
huijunwu commented on a change in pull request #2821: Update Dhalion dependency 
version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178394376
 
 

 ##
 File path: heron/executor/src/python/heron_executor.py
 ##
 @@ -498,7 +498,7 @@ def _get_healthmgr_cmd(self):
  "--cluster", self.cluster,
  "--role", self.role,
  "--environment", self.environment,
- "--topology_name", self.topology_name, "--verbose"]
+ "--topology_name", self.topology_name]
 
 Review comment:
   i would suggest either,
   keep the -verbose now till the code is stable
   or,
   align the -verbose with 'heron submit xxx -verbose': If user supplies 
verbose in cli, executor appends '-verbose'.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version

2018-03-30 Thread GitBox
huijunwu commented on a change in pull request #2821: Update Dhalion dependency 
version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178399304
 
 

 ##
 File path: 
heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
 ##
 @@ -14,69 +14,74 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.logging.Logger;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-
-import static 
com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
-import static 
com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
-import static 
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static 
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static 
com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+import static 
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 public class SlowInstanceDiagnoser extends BaseDiagnoser {
   private static final Logger LOG = 
Logger.getLogger(SlowInstanceDiagnoser.class.getName());
 
   @Override
-  public Diagnosis diagnose(List symptoms) {
-List bpSymptoms = getBackPressureSymptoms(symptoms);
-Map processingRateSkewComponents =
-getProcessingRateSkewComponents(symptoms);
-Map waitQDisparityComponents = 
getWaitQDisparityComponents(symptoms);
+  public Collection diagnose(Collection symptoms) {
+Collection diagnoses = new ArrayList<>();
+SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
 
-if (bpSymptoms.isEmpty() || waitQDisparityComponents.isEmpty()
-|| !processingRateSkewComponents.isEmpty()) {
-  // Since there is no back pressure or disparate wait count or similar
-  // execution count, no action is needed
-  return null;
-} else if (bpSymptoms.size() > 1) {
+SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+if (bp.size() > 1) {
   // TODO handle cases where multiple detectors create back pressure 
symptom
   throw new IllegalStateException("Multiple back-pressure symptoms case");
 }
-ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
+if (bp.size() == 0) {
+  return diagnoses;
+}
+String bpComponent = bp.first().assignments().iterator().next();
 
-// verify wait Q disparity and back pressure for the same component exists
-ComponentMetrics pendingBufferMetrics = 
waitQDisparityComponents.get(bpMetrics.getName());
-if (pendingBufferMetrics == null) {
-  // no wait Q disparity for the component with back pressure. There is no 
slow instance
-  return null;
+SymptomsTable processingRateSkew = 
symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+SymptomsTable waitQSkew = 
symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
+// verify wait Q disparity, similar processing rates and back pressure for 
the same component
+// exist
+if (waitQSkew.assignment(bpComponent).size() == 0
+|| processingRateSkew.assignment(bpComponent).size() > 0) {
+  // TODO in a short window rate skew could exist
+  return diagnoses;
 }
 
-ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics, 
pendingBufferMetrics);
-ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData);
-compStats.computeBpStats();
-MetricsStats bufferStats = 
compStats.computeMinMaxStats(METRIC_BUFFER_SIZE);
+Collection assignments = new ArrayList<>();
+
+Instant newest = context.checkpoint();
+Instant oldest = context.previousCheckpoint();
+MeasurementsTable measurements = context.measurements()
+.between(oldest, newest)
+.component(bpComponent);
 
-Symptom resultSymptom = null;
-for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) {
-  double 

[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version

2018-03-30 Thread GitBox
huijunwu commented on a change in pull request #2821: Update Dhalion dependency 
version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178397892
 
 

 ##
 File path: 
heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
 ##
 @@ -16,58 +16,89 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+
+import org.apache.commons.math3.stat.regression.SimpleRegression;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
-import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_GROWING_WAIT_Q;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_GROWING_WAIT_Q;
+import static 
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 
-public class GrowingWaitQueueDetector implements IDetector {
-  static final String CONF_LIMIT = 
GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
+public class GrowingWaitQueueDetector extends BaseDetector {
+  static final String CONF_LIMIT
+  = GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
 
   private static final Logger LOG = 
Logger.getLogger(GrowingWaitQueueDetector.class.getName());
-  private final BufferSizeSensor pendingBufferSensor;
   private final double rateLimit;
 
   @Inject
-  GrowingWaitQueueDetector(BufferSizeSensor pendingBufferSensor,
-   HealthPolicyConfig policyConfig) {
-this.pendingBufferSensor = pendingBufferSensor;
+  GrowingWaitQueueDetector(HealthPolicyConfig policyConfig) {
 rateLimit = (double) policyConfig.getConfig(CONF_LIMIT, 10.0);
   }
 
   /**
* Detects all components unable to keep up with input load, hence having a 
growing pending buffer
* or wait queue
*
-   * @return A collection of all components executing slower than input rate.
+   * @return A collection of symptoms each one corresponding to a components 
executing slower
+   * than input rate.
*/
   @Override
-  public List detect() {
-ArrayList result = new ArrayList<>();
-
-Map bufferSizes = pendingBufferSensor.get();
-for (ComponentMetrics compMetrics : bufferSizes.values()) {
-  ComponentMetricsHelper compStats = new 
ComponentMetricsHelper(compMetrics);
-  compStats.computeBufferSizeTrend();
-  if (compStats.getMaxBufferChangeRate() > rateLimit) {
+  public Collection detect(Collection measurements) {
+
+Collection result = new ArrayList<>();
+
+MeasurementsTable waitQueueMetrics = 
MeasurementsTable.of(measurements).type
+(METRIC_WAIT_Q_SIZE.text());
+for (String component : waitQueueMetrics.uniqueComponents()) {
+  Set addresses = new HashSet<>();
+  double maxSlope = 
computeWaitQueueSizeTrend(waitQueueMetrics.component(component));
+  if (maxSlope > rateLimit) {
 LOG.info(String.format("Detected growing wait queues for %s, max rate 
%f",
-compMetrics.getName(), compStats.getMaxBufferChangeRate()));
-result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), compMetrics));
+component, maxSlope));
+addresses.add(component);
+result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), 
context.checkpoint(), addresses));
   }
 }
-
 return result;
   }
+
+
+  private double computeWaitQueueSizeTrend(MeasurementsTable metrics) {
+double maxSlope = 0;
+for (String instance : metrics.uniqueInstances()) {
+
+  if (metrics.instance(instance) == null || 
metrics.instance(instance).size() < 3) {
+// missing of insufficient data for creating a trend line
+continue;
+  }
+
+  Collection measurements = 
metrics.instance(instance).sort(false,
+  MeasurementsTable.SortKey
+  .TIME_STAMP).get();
+  SimpleRegression simpleRegression = new SimpleRegression(true);
+
+  for (Measurement m : measurements) {
+simpleRegression.addData(m.instant().getEpochSecond(), m.value());
 
 Review comment:
   shall we limit the data to a recent time window?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact 

[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version

2018-03-30 Thread GitBox
huijunwu commented on a change in pull request #2821: Update Dhalion dependency 
version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178398628
 
 

 ##
 File path: 
heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
 ##
 @@ -15,53 +15,100 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
+import java.time.Instant;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
 import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
 public class SkewDetector extends BaseDetector {
-  private static final Logger LOG = 
Logger.getLogger(SkewDetector.class.getName());
-  private final BaseSensor sensor;
   private final double skewRatio;
-  private final BaseDetector.SymptomName symptomName;
+  private final String metricName;
+  private final BaseDetector.SymptomType symptomType;
 
   @Inject
-  SkewDetector(BaseSensor sensor, double skewRatio, BaseDetector.SymptomName 
symptom) {
-this.sensor = sensor;
+  SkewDetector(double skewRatio, BaseSensor.MetricName metricName, 
BaseDetector.SymptomType
+  symptomType) {
 this.skewRatio = skewRatio;
-this.symptomName = symptom;
+this.metricName = metricName.text();
+this.symptomType = symptomType;
   }
 
   /**
-   * Detects components experiencing data skew, instances with vastly 
different execute counts.
+   * Detects components experiencing skew on a specific metric
*
-   * @return A collection of affected components
+   * @return At most two symptoms corresponding to each affected component -- 
one for positive skew
+   * and one for negative skew
*/
   @Override
-  public List detect() {
-ArrayList result = new ArrayList<>();
-
-Map metrics = sensor.get();
-for (ComponentMetrics compMetrics : metrics.values()) {
-  ComponentMetricsHelper compStats = new 
ComponentMetricsHelper(compMetrics);
-  MetricsStats stats = 
compStats.computeMinMaxStats(sensor.getMetricName());
-  if (stats.getMetricMax() > skewRatio * stats.getMetricMin()) {
-LOG.info(String.format("Detected skew for %s, min = %f, max = %f",
-compMetrics.getName(), stats.getMetricMin(), 
stats.getMetricMax()));
-result.add(new Symptom(symptomName.text(), compMetrics));
+  public Collection detect(Collection measurements) {
+Collection result = new ArrayList<>();
+
+MeasurementsTable metrics = 
MeasurementsTable.of(measurements).type(metricName);
+Instant now = context.checkpoint();
+for (String component : metrics.uniqueComponents()) {
+  Set addresses = new HashSet<>();
+  Set positiveAddresses = new HashSet<>();
+  Set negativeAddresses = new HashSet<>();
+
+  double componentMax = getMaxOfAverage(metrics.component(component));
+  double componentMin = getMinOfAverage(metrics.component(component));
+  if (componentMax > skewRatio * componentMin) {
+//there is skew
+addresses.add(component);
+result.add(new Symptom(symptomType.text(), now, addresses));
+
+for (String instance : metrics.component(component).uniqueInstances()) 
{
+  if (metrics.instance(instance).mean() >= 0.90 * componentMax) {
+positiveAddresses.add(instance);
+  }
+  if (metrics.instance(instance).mean() <= 1.10 * componentMin) {
+negativeAddresses.add(instance);
+  }
+}
+
+if (!positiveAddresses.isEmpty()) {
+  result.add(new Symptom("POSITIVE " + symptomType.text(), now, 
positiveAddresses));
+}
+if (!negativeAddresses.isEmpty()) {
+  result.add(new Symptom("NEGATIVE " + symptomType.text(), now, 
negativeAddresses));
+}
   }
-}
 
+}
 return result;
   }
+
+  @VisibleForTesting
+  double getMaxOfAverage(MeasurementsTable table) {
 
 Review comment:
   why not put the two helper methods in `MeasurementsTable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services