[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version
huijunwu commented on a change in pull request #2821: Update Dhalion dependency version URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r179871817 ## File path: heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java ## @@ -15,53 +15,100 @@ package com.twitter.heron.healthmgr.detectors; +import java.time.Instant; import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.logging.Logger; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import javax.inject.Inject; -import com.microsoft.dhalion.detector.Symptom; -import com.microsoft.dhalion.metrics.ComponentMetrics; +import com.google.common.annotations.VisibleForTesting; +import com.microsoft.dhalion.core.Measurement; +import com.microsoft.dhalion.core.MeasurementsTable; +import com.microsoft.dhalion.core.Symptom; -import com.twitter.heron.healthmgr.common.ComponentMetricsHelper; -import com.twitter.heron.healthmgr.common.MetricsStats; import com.twitter.heron.healthmgr.sensors.BaseSensor; public class SkewDetector extends BaseDetector { - private static final Logger LOG = Logger.getLogger(SkewDetector.class.getName()); - private final BaseSensor sensor; private final double skewRatio; - private final BaseDetector.SymptomName symptomName; + private final String metricName; + private final BaseDetector.SymptomType symptomType; @Inject - SkewDetector(BaseSensor sensor, double skewRatio, BaseDetector.SymptomName symptom) { -this.sensor = sensor; + SkewDetector(double skewRatio, BaseSensor.MetricName metricName, BaseDetector.SymptomType + symptomType) { this.skewRatio = skewRatio; -this.symptomName = symptom; +this.metricName = metricName.text(); +this.symptomType = symptomType; } /** - * Detects components experiencing data skew, instances with vastly different execute counts. + * Detects components experiencing skew on a specific metric * - * @return A collection of affected components + * @return At most two symptoms corresponding to each affected component -- one for positive skew + * and one for negative skew */ @Override - public List detect() { -ArrayList result = new ArrayList<>(); - -Mapmetrics = sensor.get(); -for (ComponentMetrics compMetrics : metrics.values()) { - ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics); - MetricsStats stats = compStats.computeMinMaxStats(sensor.getMetricName()); - if (stats.getMetricMax() > skewRatio * stats.getMetricMin()) { -LOG.info(String.format("Detected skew for %s, min = %f, max = %f", -compMetrics.getName(), stats.getMetricMin(), stats.getMetricMax())); -result.add(new Symptom(symptomName.text(), compMetrics)); + public Collection detect(Collection measurements) { +Collection result = new ArrayList<>(); + +MeasurementsTable metrics = MeasurementsTable.of(measurements).type(metricName); +Instant now = context.checkpoint(); +for (String component : metrics.uniqueComponents()) { + Set addresses = new HashSet<>(); + Set positiveAddresses = new HashSet<>(); + Set negativeAddresses = new HashSet<>(); + + double componentMax = getMaxOfAverage(metrics.component(component)); + double componentMin = getMinOfAverage(metrics.component(component)); + if (componentMax > skewRatio * componentMin) { +//there is skew +addresses.add(component); +result.add(new Symptom(symptomType.text(), now, addresses)); + +for (String instance : metrics.component(component).uniqueInstances()) { + if (metrics.instance(instance).mean() >= 0.90 * componentMax) { +positiveAddresses.add(instance); + } + if (metrics.instance(instance).mean() <= 1.10 * componentMin) { +negativeAddresses.add(instance); + } +} + +if (!positiveAddresses.isEmpty()) { + result.add(new Symptom("POSITIVE " + symptomType.text(), now, positiveAddresses)); +} +if (!negativeAddresses.isEmpty()) { + result.add(new Symptom("NEGATIVE " + symptomType.text(), now, negativeAddresses)); +} } -} +} return result; } + + @VisibleForTesting + double getMaxOfAverage(MeasurementsTable table) { Review comment: sgtm This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version
huijunwu commented on a change in pull request #2821: Update Dhalion dependency version URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r179871874 ## File path: heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java ## @@ -16,58 +16,89 @@ package com.twitter.heron.healthmgr.detectors; import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.logging.Logger; import javax.inject.Inject; -import com.microsoft.dhalion.api.IDetector; -import com.microsoft.dhalion.detector.Symptom; -import com.microsoft.dhalion.metrics.ComponentMetrics; +import com.microsoft.dhalion.core.Measurement; +import com.microsoft.dhalion.core.MeasurementsTable; +import com.microsoft.dhalion.core.Symptom; + +import org.apache.commons.math3.stat.regression.SimpleRegression; import com.twitter.heron.healthmgr.HealthPolicyConfig; -import com.twitter.heron.healthmgr.common.ComponentMetricsHelper; -import com.twitter.heron.healthmgr.sensors.BufferSizeSensor; -import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_GROWING_WAIT_Q; +import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_GROWING_WAIT_Q; +import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE; -public class GrowingWaitQueueDetector implements IDetector { - static final String CONF_LIMIT = GrowingWaitQueueDetector.class.getSimpleName() + ".limit"; +public class GrowingWaitQueueDetector extends BaseDetector { + static final String CONF_LIMIT + = GrowingWaitQueueDetector.class.getSimpleName() + ".limit"; private static final Logger LOG = Logger.getLogger(GrowingWaitQueueDetector.class.getName()); - private final BufferSizeSensor pendingBufferSensor; private final double rateLimit; @Inject - GrowingWaitQueueDetector(BufferSizeSensor pendingBufferSensor, - HealthPolicyConfig policyConfig) { -this.pendingBufferSensor = pendingBufferSensor; + GrowingWaitQueueDetector(HealthPolicyConfig policyConfig) { rateLimit = (double) policyConfig.getConfig(CONF_LIMIT, 10.0); } /** * Detects all components unable to keep up with input load, hence having a growing pending buffer * or wait queue * - * @return A collection of all components executing slower than input rate. + * @return A collection of symptoms each one corresponding to a components executing slower + * than input rate. */ @Override - public List detect() { -ArrayList result = new ArrayList<>(); - -MapbufferSizes = pendingBufferSensor.get(); -for (ComponentMetrics compMetrics : bufferSizes.values()) { - ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics); - compStats.computeBufferSizeTrend(); - if (compStats.getMaxBufferChangeRate() > rateLimit) { + public Collection detect(Collection measurements) { + +Collection result = new ArrayList<>(); + +MeasurementsTable waitQueueMetrics = MeasurementsTable.of(measurements).type +(METRIC_WAIT_Q_SIZE.text()); +for (String component : waitQueueMetrics.uniqueComponents()) { + Set addresses = new HashSet<>(); + double maxSlope = computeWaitQueueSizeTrend(waitQueueMetrics.component(component)); + if (maxSlope > rateLimit) { LOG.info(String.format("Detected growing wait queues for %s, max rate %f", -compMetrics.getName(), compStats.getMaxBufferChangeRate())); -result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), compMetrics)); +component, maxSlope)); +addresses.add(component); +result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), context.checkpoint(), addresses)); } } - return result; } + + + private double computeWaitQueueSizeTrend(MeasurementsTable metrics) { +double maxSlope = 0; +for (String instance : metrics.uniqueInstances()) { + + if (metrics.instance(instance) == null || metrics.instance(instance).size() < 3) { +// missing of insufficient data for creating a trend line +continue; + } + + Collection measurements = metrics.instance(instance).sort(false, + MeasurementsTable.SortKey + .TIME_STAMP).get(); + SimpleRegression simpleRegression = new SimpleRegression(true); + + for (Measurement m : measurements) { +simpleRegression.addData(m.instant().getEpochSecond(), m.value()); Review comment: sgtm This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version
huijunwu commented on a change in pull request #2821: Update Dhalion dependency version URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178395687 ## File path: heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java ## @@ -15,58 +15,67 @@ package com.twitter.heron.healthmgr.detectors; +import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.logging.Logger; import javax.inject.Inject; -import com.microsoft.dhalion.api.IDetector; -import com.microsoft.dhalion.detector.Symptom; -import com.microsoft.dhalion.metrics.ComponentMetrics; +import com.microsoft.dhalion.core.Measurement; +import com.microsoft.dhalion.core.MeasurementsTable; +import com.microsoft.dhalion.core.Symptom; import com.twitter.heron.healthmgr.HealthPolicyConfig; -import com.twitter.heron.healthmgr.common.ComponentMetricsHelper; -import com.twitter.heron.healthmgr.sensors.BackPressureSensor; -import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE; +import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE; +import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE; +import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE; -public class BackPressureDetector implements IDetector { - public static final String CONF_NOISE_FILTER = "BackPressureDetector.noiseFilterMillis"; +public class BackPressureDetector extends BaseDetector { + static final String CONF_NOISE_FILTER = "BackPressureDetector.noiseFilterMillis"; private static final Logger LOG = Logger.getLogger(BackPressureDetector.class.getName()); - private final BackPressureSensor bpSensor; private final int noiseFilterMillis; @Inject - BackPressureDetector(BackPressureSensor bpSensor, - HealthPolicyConfig policyConfig) { -this.bpSensor = bpSensor; + BackPressureDetector(HealthPolicyConfig policyConfig) { noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20); } /** * Detects all components initiating backpressure above the configured limit. Normally there * will be only one component * - * @return A collection of all components causing backpressure. + * @return A collection of symptoms each one corresponding to a components with backpressure. */ @Override - public List detect() { -ArrayList result = new ArrayList<>(); + public Collection detect(Collection measurements) { +Collection result = new ArrayList<>(); +Instant now = context.checkpoint(); -MapbackpressureMetrics = bpSensor.get(); -for (ComponentMetrics compMetrics : backpressureMetrics.values()) { - ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics); - compStats.computeBpStats(); - if (compStats.getTotalBackpressure() > noiseFilterMillis) { -LOG.info(String.format("Detected back pressure for %s, total back pressure is %f", -compMetrics.getName(), compStats.getTotalBackpressure())); -result.add(new Symptom(SYMPTOM_BACK_PRESSURE.text(), compMetrics)); +MeasurementsTable bpMetrics += MeasurementsTable.of(measurements).type(METRIC_BACK_PRESSURE.text()); +for (String component : bpMetrics.uniqueComponents()) { + double compBackPressure = bpMetrics.component(component).sum(); + if (compBackPressure > noiseFilterMillis) { +LOG.info(String.format("Detected component back-pressure for %s, total back pressure is %f", +component, compBackPressure)); +List addresses = Collections.singletonList(component); +result.add(new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, addresses)); + } +} +for (String instance : bpMetrics.uniqueInstances()) { Review comment: it looks like that component and instance are independent here. shall we enforce/verify that the instance should be of the above component? besides, component and instance are flat in the table/context. is there a place to track the component-instance relation? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version
huijunwu commented on a change in pull request #2821: Update Dhalion dependency version URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178394376 ## File path: heron/executor/src/python/heron_executor.py ## @@ -498,7 +498,7 @@ def _get_healthmgr_cmd(self): "--cluster", self.cluster, "--role", self.role, "--environment", self.environment, - "--topology_name", self.topology_name, "--verbose"] + "--topology_name", self.topology_name] Review comment: i would suggest either, keep the -verbose now till the code is stable or, align the -verbose with 'heron submit xxx -verbose': If user supplies verbose in cli, executor appends '-verbose'. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version
huijunwu commented on a change in pull request #2821: Update Dhalion dependency version URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178399304 ## File path: heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java ## @@ -14,69 +14,74 @@ package com.twitter.heron.healthmgr.diagnosers; -import java.util.List; -import java.util.Map; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; import java.util.logging.Logger; -import com.microsoft.dhalion.detector.Symptom; -import com.microsoft.dhalion.diagnoser.Diagnosis; -import com.microsoft.dhalion.metrics.ComponentMetrics; -import com.microsoft.dhalion.metrics.InstanceMetrics; +import com.microsoft.dhalion.core.Diagnosis; +import com.microsoft.dhalion.core.MeasurementsTable; +import com.microsoft.dhalion.core.Symptom; +import com.microsoft.dhalion.core.SymptomsTable; -import com.twitter.heron.healthmgr.common.ComponentMetricsHelper; -import com.twitter.heron.healthmgr.common.MetricsStats; - -import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE; -import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE; -import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE; -import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE; +import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE; +import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW; +import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW; +import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE; +import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE; public class SlowInstanceDiagnoser extends BaseDiagnoser { private static final Logger LOG = Logger.getLogger(SlowInstanceDiagnoser.class.getName()); @Override - public Diagnosis diagnose(List symptoms) { -List bpSymptoms = getBackPressureSymptoms(symptoms); -MapprocessingRateSkewComponents = -getProcessingRateSkewComponents(symptoms); -Map waitQDisparityComponents = getWaitQDisparityComponents(symptoms); + public Collection diagnose(Collection symptoms) { +Collection diagnoses = new ArrayList<>(); +SymptomsTable symptomsTable = SymptomsTable.of(symptoms); -if (bpSymptoms.isEmpty() || waitQDisparityComponents.isEmpty() -|| !processingRateSkewComponents.isEmpty()) { - // Since there is no back pressure or disparate wait count or similar - // execution count, no action is needed - return null; -} else if (bpSymptoms.size() > 1) { +SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text()); +if (bp.size() > 1) { // TODO handle cases where multiple detectors create back pressure symptom throw new IllegalStateException("Multiple back-pressure symptoms case"); } -ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent(); +if (bp.size() == 0) { + return diagnoses; +} +String bpComponent = bp.first().assignments().iterator().next(); -// verify wait Q disparity and back pressure for the same component exists -ComponentMetrics pendingBufferMetrics = waitQDisparityComponents.get(bpMetrics.getName()); -if (pendingBufferMetrics == null) { - // no wait Q disparity for the component with back pressure. There is no slow instance - return null; +SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text()); +SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text()); +// verify wait Q disparity, similar processing rates and back pressure for the same component +// exist +if (waitQSkew.assignment(bpComponent).size() == 0 +|| processingRateSkew.assignment(bpComponent).size() > 0) { + // TODO in a short window rate skew could exist + return diagnoses; } -ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics, pendingBufferMetrics); -ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData); -compStats.computeBpStats(); -MetricsStats bufferStats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE); +Collection assignments = new ArrayList<>(); + +Instant newest = context.checkpoint(); +Instant oldest = context.previousCheckpoint(); +MeasurementsTable measurements = context.measurements() +.between(oldest, newest) +.component(bpComponent); -Symptom resultSymptom = null; -for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) { - double
[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version
huijunwu commented on a change in pull request #2821: Update Dhalion dependency version URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178397892 ## File path: heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java ## @@ -16,58 +16,89 @@ package com.twitter.heron.healthmgr.detectors; import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.logging.Logger; import javax.inject.Inject; -import com.microsoft.dhalion.api.IDetector; -import com.microsoft.dhalion.detector.Symptom; -import com.microsoft.dhalion.metrics.ComponentMetrics; +import com.microsoft.dhalion.core.Measurement; +import com.microsoft.dhalion.core.MeasurementsTable; +import com.microsoft.dhalion.core.Symptom; + +import org.apache.commons.math3.stat.regression.SimpleRegression; import com.twitter.heron.healthmgr.HealthPolicyConfig; -import com.twitter.heron.healthmgr.common.ComponentMetricsHelper; -import com.twitter.heron.healthmgr.sensors.BufferSizeSensor; -import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_GROWING_WAIT_Q; +import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_GROWING_WAIT_Q; +import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE; -public class GrowingWaitQueueDetector implements IDetector { - static final String CONF_LIMIT = GrowingWaitQueueDetector.class.getSimpleName() + ".limit"; +public class GrowingWaitQueueDetector extends BaseDetector { + static final String CONF_LIMIT + = GrowingWaitQueueDetector.class.getSimpleName() + ".limit"; private static final Logger LOG = Logger.getLogger(GrowingWaitQueueDetector.class.getName()); - private final BufferSizeSensor pendingBufferSensor; private final double rateLimit; @Inject - GrowingWaitQueueDetector(BufferSizeSensor pendingBufferSensor, - HealthPolicyConfig policyConfig) { -this.pendingBufferSensor = pendingBufferSensor; + GrowingWaitQueueDetector(HealthPolicyConfig policyConfig) { rateLimit = (double) policyConfig.getConfig(CONF_LIMIT, 10.0); } /** * Detects all components unable to keep up with input load, hence having a growing pending buffer * or wait queue * - * @return A collection of all components executing slower than input rate. + * @return A collection of symptoms each one corresponding to a components executing slower + * than input rate. */ @Override - public List detect() { -ArrayList result = new ArrayList<>(); - -MapbufferSizes = pendingBufferSensor.get(); -for (ComponentMetrics compMetrics : bufferSizes.values()) { - ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics); - compStats.computeBufferSizeTrend(); - if (compStats.getMaxBufferChangeRate() > rateLimit) { + public Collection detect(Collection measurements) { + +Collection result = new ArrayList<>(); + +MeasurementsTable waitQueueMetrics = MeasurementsTable.of(measurements).type +(METRIC_WAIT_Q_SIZE.text()); +for (String component : waitQueueMetrics.uniqueComponents()) { + Set addresses = new HashSet<>(); + double maxSlope = computeWaitQueueSizeTrend(waitQueueMetrics.component(component)); + if (maxSlope > rateLimit) { LOG.info(String.format("Detected growing wait queues for %s, max rate %f", -compMetrics.getName(), compStats.getMaxBufferChangeRate())); -result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), compMetrics)); +component, maxSlope)); +addresses.add(component); +result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), context.checkpoint(), addresses)); } } - return result; } + + + private double computeWaitQueueSizeTrend(MeasurementsTable metrics) { +double maxSlope = 0; +for (String instance : metrics.uniqueInstances()) { + + if (metrics.instance(instance) == null || metrics.instance(instance).size() < 3) { +// missing of insufficient data for creating a trend line +continue; + } + + Collection measurements = metrics.instance(instance).sort(false, + MeasurementsTable.SortKey + .TIME_STAMP).get(); + SimpleRegression simpleRegression = new SimpleRegression(true); + + for (Measurement m : measurements) { +simpleRegression.addData(m.instant().getEpochSecond(), m.value()); Review comment: shall we limit the data to a recent time window? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] huijunwu commented on a change in pull request #2821: Update Dhalion dependency version
huijunwu commented on a change in pull request #2821: Update Dhalion dependency version URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178398628 ## File path: heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java ## @@ -15,53 +15,100 @@ package com.twitter.heron.healthmgr.detectors; +import java.time.Instant; import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.logging.Logger; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import javax.inject.Inject; -import com.microsoft.dhalion.detector.Symptom; -import com.microsoft.dhalion.metrics.ComponentMetrics; +import com.google.common.annotations.VisibleForTesting; +import com.microsoft.dhalion.core.Measurement; +import com.microsoft.dhalion.core.MeasurementsTable; +import com.microsoft.dhalion.core.Symptom; -import com.twitter.heron.healthmgr.common.ComponentMetricsHelper; -import com.twitter.heron.healthmgr.common.MetricsStats; import com.twitter.heron.healthmgr.sensors.BaseSensor; public class SkewDetector extends BaseDetector { - private static final Logger LOG = Logger.getLogger(SkewDetector.class.getName()); - private final BaseSensor sensor; private final double skewRatio; - private final BaseDetector.SymptomName symptomName; + private final String metricName; + private final BaseDetector.SymptomType symptomType; @Inject - SkewDetector(BaseSensor sensor, double skewRatio, BaseDetector.SymptomName symptom) { -this.sensor = sensor; + SkewDetector(double skewRatio, BaseSensor.MetricName metricName, BaseDetector.SymptomType + symptomType) { this.skewRatio = skewRatio; -this.symptomName = symptom; +this.metricName = metricName.text(); +this.symptomType = symptomType; } /** - * Detects components experiencing data skew, instances with vastly different execute counts. + * Detects components experiencing skew on a specific metric * - * @return A collection of affected components + * @return At most two symptoms corresponding to each affected component -- one for positive skew + * and one for negative skew */ @Override - public List detect() { -ArrayList result = new ArrayList<>(); - -Mapmetrics = sensor.get(); -for (ComponentMetrics compMetrics : metrics.values()) { - ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics); - MetricsStats stats = compStats.computeMinMaxStats(sensor.getMetricName()); - if (stats.getMetricMax() > skewRatio * stats.getMetricMin()) { -LOG.info(String.format("Detected skew for %s, min = %f, max = %f", -compMetrics.getName(), stats.getMetricMin(), stats.getMetricMax())); -result.add(new Symptom(symptomName.text(), compMetrics)); + public Collection detect(Collection measurements) { +Collection result = new ArrayList<>(); + +MeasurementsTable metrics = MeasurementsTable.of(measurements).type(metricName); +Instant now = context.checkpoint(); +for (String component : metrics.uniqueComponents()) { + Set addresses = new HashSet<>(); + Set positiveAddresses = new HashSet<>(); + Set negativeAddresses = new HashSet<>(); + + double componentMax = getMaxOfAverage(metrics.component(component)); + double componentMin = getMinOfAverage(metrics.component(component)); + if (componentMax > skewRatio * componentMin) { +//there is skew +addresses.add(component); +result.add(new Symptom(symptomType.text(), now, addresses)); + +for (String instance : metrics.component(component).uniqueInstances()) { + if (metrics.instance(instance).mean() >= 0.90 * componentMax) { +positiveAddresses.add(instance); + } + if (metrics.instance(instance).mean() <= 1.10 * componentMin) { +negativeAddresses.add(instance); + } +} + +if (!positiveAddresses.isEmpty()) { + result.add(new Symptom("POSITIVE " + symptomType.text(), now, positiveAddresses)); +} +if (!negativeAddresses.isEmpty()) { + result.add(new Symptom("NEGATIVE " + symptomType.text(), now, negativeAddresses)); +} } -} +} return result; } + + @VisibleForTesting + double getMaxOfAverage(MeasurementsTable table) { Review comment: why not put the two helper methods in `MeasurementsTable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services