This is an automated email from the ASF dual-hosted git repository. jihao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6a1feab [TE] detection - merger max duration (#3915) 6a1feab is described below commit 6a1feab19d02faf9ca70a7685ce19a13135159ee Author: Jihao Zhang <jihzh...@linkedin.com> AuthorDate: Wed Mar 6 14:33:36 2019 -0800 [TE] detection - merger max duration (#3915) Split the anomaly if it's larger than the max duration --- .../thirdeye/detection/algorithm/MergeWrapper.java | 56 +++++++++++++++++++++- .../wrapper/ChildKeepingMergeWrapper.java | 25 ++-------- .../detection/algorithm/MergeWrapperTest.java | 51 +++++++++++++++----- .../wrapper/ChildKeepingMergeWrapperTest.java | 4 +- 4 files changed, 100 insertions(+), 36 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java index 1ad6c8e..01ca092 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java @@ -86,6 +86,7 @@ public class MergeWrapper extends DetectionPipeline { this.maxGap = MapUtils.getLongValue(config.getProperties(), "maxGap", 0); this.maxDuration = MapUtils.getLongValue(config.getProperties(), "maxDuration", Long.MAX_VALUE); + Preconditions.checkArgument(this.maxDuration > 0, "Max duration must be a positive number."); this.slice = new AnomalySlice().withStart(startTime).withEnd(endTime); this.nestedProperties = new ArrayList<>(); List<Map<String, Object>> nested = ConfigUtils.getList(config.getProperties().get(PROP_NESTED)); @@ -142,7 +143,7 @@ public class MergeWrapper extends DetectionPipeline { // logic to do time-based merging. protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) { - List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies); + List<MergedAnomalyResultDTO> input = new ArrayList<>(enforceMaxDuration(anomalies)); Collections.sort(input, COMPARATOR); List<MergedAnomalyResultDTO> output = new ArrayList<>(); @@ -187,6 +188,40 @@ public class MergeWrapper extends DetectionPipeline { return output; } + /* + Make sure that the anomalies generated from detector is shorter than maxDuration. Otherwise, split the anomaly + */ + private Collection<MergedAnomalyResultDTO> enforceMaxDuration(Collection<MergedAnomalyResultDTO> anomalies) { + Set<MergedAnomalyResultDTO> result = new HashSet<>(); + for (MergedAnomalyResultDTO anomaly : anomalies) { + if (anomaly.getEndTime() - anomaly.getStartTime() > this.maxDuration) { + result.addAll(splitAnomaly(anomaly, this.maxDuration)); + } else { + result.add(anomaly); + } + } + return result; + } + + /* + Split the anomaly into multiple consecutive anomalies with duration less than the max allowed duration. + */ + private Collection<MergedAnomalyResultDTO> splitAnomaly(MergedAnomalyResultDTO anomaly, long maxDuration) { + int anomalyCountAfterSplit = (int) Math.ceil((anomaly.getEndTime() - anomaly.getStartTime()) / (double) maxDuration); + Set<MergedAnomalyResultDTO> result = new HashSet<>(); + + long nextStartTime = anomaly.getStartTime(); + for (int i = 0; i < anomalyCountAfterSplit; i++) { + MergedAnomalyResultDTO splitedAnomaly = copyAnomalyInfo(anomaly, new MergedAnomalyResultDTO()); + splitedAnomaly.setStartTime(nextStartTime); + splitedAnomaly.setEndTime(Math.min(nextStartTime + maxDuration, anomaly.getEndTime())); + nextStartTime = splitedAnomaly.getEndTime(); + result.add(splitedAnomaly); + } + return result; + } + + protected long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) { long time = this.startTime; for (MergedAnomalyResultDTO anomaly : anomalies) { @@ -203,6 +238,25 @@ public class MergeWrapper extends DetectionPipeline { return time; } + + protected MergedAnomalyResultDTO copyAnomalyInfo(MergedAnomalyResultDTO anomaly, MergedAnomalyResultDTO newAnomaly) { + newAnomaly.setStartTime(anomaly.getStartTime()); + newAnomaly.setEndTime(anomaly.getEndTime()); + newAnomaly.setMetric(anomaly.getMetric()); + newAnomaly.setMetricUrn(anomaly.getMetricUrn()); + newAnomaly.setCollection(anomaly.getCollection()); + newAnomaly.setDimensions(anomaly.getDimensions()); + newAnomaly.setDetectionConfigId(anomaly.getDetectionConfigId()); + newAnomaly.setAnomalyResultSource(anomaly.getAnomalyResultSource()); + newAnomaly.setAvgBaselineVal(anomaly.getAvgBaselineVal()); + newAnomaly.setAvgCurrentVal(anomaly.getAvgCurrentVal()); + newAnomaly.setFeedback(anomaly.getFeedback()); + newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId()); + newAnomaly.setScore(anomaly.getScore()); + newAnomaly.setWeight(anomaly.getWeight()); + return newAnomaly; + } + protected static class AnomalyKey { final String metric; final String collection; diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java index 4a997ec..a6ab276 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java @@ -20,16 +20,16 @@ package org.apache.pinot.thirdeye.detection.wrapper; import com.google.common.collect.Collections2; -import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; -import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; -import org.apache.pinot.thirdeye.detection.DataProvider; -import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; +import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; +import org.apache.pinot.thirdeye.detection.DataProvider; +import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper; /** @@ -98,21 +98,4 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper { return output; } - private MergedAnomalyResultDTO copyAnomalyInfo(MergedAnomalyResultDTO anomaly, MergedAnomalyResultDTO newAnomaly) { - newAnomaly.setStartTime(anomaly.getStartTime()); - newAnomaly.setEndTime(anomaly.getEndTime()); - newAnomaly.setMetric(anomaly.getMetric()); - newAnomaly.setMetricUrn(anomaly.getMetricUrn()); - newAnomaly.setCollection(anomaly.getCollection()); - newAnomaly.setDimensions(anomaly.getDimensions()); - newAnomaly.setDetectionConfigId(anomaly.getDetectionConfigId()); - newAnomaly.setAnomalyResultSource(anomaly.getAnomalyResultSource()); - newAnomaly.setAvgBaselineVal(anomaly.getAvgBaselineVal()); - newAnomaly.setAvgCurrentVal(anomaly.getAvgCurrentVal()); - newAnomaly.setFeedback(anomaly.getFeedback()); - newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId()); - newAnomaly.setScore(anomaly.getScore()); - newAnomaly.setWeight(anomaly.getWeight()); - return newAnomaly; - } } diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java index bffc3af..f8c3357 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java @@ -16,14 +16,6 @@ package org.apache.pinot.thirdeye.detection.algorithm; -import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; -import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; -import org.apache.pinot.thirdeye.detection.DataProvider; -import org.apache.pinot.thirdeye.detection.DetectionPipelineResult; -import org.apache.pinot.thirdeye.detection.MockDataProvider; -import org.apache.pinot.thirdeye.detection.MockPipeline; -import org.apache.pinot.thirdeye.detection.MockPipelineLoader; -import org.apache.pinot.thirdeye.detection.MockPipelineOutput; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -32,6 +24,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; +import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; +import org.apache.pinot.thirdeye.detection.DataProvider; +import org.apache.pinot.thirdeye.detection.DetectionPipelineResult; +import org.apache.pinot.thirdeye.detection.MockDataProvider; +import org.apache.pinot.thirdeye.detection.MockPipeline; +import org.apache.pinot.thirdeye.detection.MockPipelineLoader; +import org.apache.pinot.thirdeye.detection.MockPipelineOutput; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -111,12 +111,10 @@ public class MergeWrapperTest { @Test public void testMergerPassthru() throws Exception { - this.config.getProperties().put(PROP_MAX_DURATION, 0); - this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000); DetectionPipelineResult output = this.wrapper.run(); - Assert.assertEquals(output.getAnomalies().size(), 6); + Assert.assertEquals(output.getAnomalies().size(), 5); Assert.assertEquals(output.getLastTimestamp(), 2900); } @@ -204,6 +202,37 @@ public class MergeWrapperTest { } @Test + public void testMergerMaxDurationEnforce() throws Exception { + this.config.getProperties().put(PROP_MAX_DURATION, 500); + + this.outputs.add(new MockPipelineOutput(Arrays.asList( + makeAnomaly(2800, 3800), + makeAnomaly(3500, 3600) + ), 3700)); + + Map<String, Object> nestedProperties = new HashMap<>(); + nestedProperties.put(PROP_CLASS_NAME, "none"); + nestedProperties.put(PROP_METRIC_URN, "thirdeye:metric:3"); + + this.nestedProperties.add(nestedProperties); + + this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000); + DetectionPipelineResult output = this.wrapper.run(); + + Assert.assertEquals(output.getAnomalies().size(), 8); + Assert.assertEquals(output.getLastTimestamp(), 2900); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 500))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(500, 1000))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2900))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2900, 3400))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3400, 3800))); + } + + + @Test public void testMergerExecution() throws Exception { this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000); this.wrapper.run(); diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java index b39b865..c9f44b4 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java @@ -106,12 +106,10 @@ public class ChildKeepingMergeWrapperTest { @Test public void testMergerPassthru() throws Exception { - this.config.getProperties().put(PROP_MAX_DURATION, 0); - this.wrapper = new ChildKeepingMergeWrapper(this.provider, this.config, 1000, 3000); DetectionPipelineResult output = this.wrapper.run(); - Assert.assertEquals(output.getAnomalies().size(), 6); + Assert.assertEquals(output.getAnomalies().size(), 5); Assert.assertEquals(output.getLastTimestamp(), 2900); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org