This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a1feab  [TE] detection - merger max duration (#3915)
6a1feab is described below

commit 6a1feab19d02faf9ca70a7685ce19a13135159ee
Author: Jihao Zhang <jihzh...@linkedin.com>
AuthorDate: Wed Mar 6 14:33:36 2019 -0800

    [TE] detection - merger max duration (#3915)
    
    Split the anomaly if it's larger than the max duration
---
 .../thirdeye/detection/algorithm/MergeWrapper.java | 56 +++++++++++++++++++++-
 .../wrapper/ChildKeepingMergeWrapper.java          | 25 ++--------
 .../detection/algorithm/MergeWrapperTest.java      | 51 +++++++++++++++-----
 .../wrapper/ChildKeepingMergeWrapperTest.java      |  4 +-
 4 files changed, 100 insertions(+), 36 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index 1ad6c8e..01ca092 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -86,6 +86,7 @@ public class MergeWrapper extends DetectionPipeline {
 
     this.maxGap = MapUtils.getLongValue(config.getProperties(), "maxGap", 0);
     this.maxDuration = MapUtils.getLongValue(config.getProperties(), 
"maxDuration", Long.MAX_VALUE);
+    Preconditions.checkArgument(this.maxDuration > 0, "Max duration must be a 
positive number.");
     this.slice = new AnomalySlice().withStart(startTime).withEnd(endTime);
     this.nestedProperties = new ArrayList<>();
     List<Map<String, Object>> nested = 
ConfigUtils.getList(config.getProperties().get(PROP_NESTED));
@@ -142,7 +143,7 @@ public class MergeWrapper extends DetectionPipeline {
 
   // logic to do time-based merging.
   protected List<MergedAnomalyResultDTO> 
merge(Collection<MergedAnomalyResultDTO> anomalies) {
-    List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
+    List<MergedAnomalyResultDTO> input = new 
ArrayList<>(enforceMaxDuration(anomalies));
     Collections.sort(input, COMPARATOR);
 
     List<MergedAnomalyResultDTO> output = new ArrayList<>();
@@ -187,6 +188,40 @@ public class MergeWrapper extends DetectionPipeline {
     return output;
   }
 
+  /*
+    Make sure that the anomalies generated from detector is shorter than 
maxDuration. Otherwise, split the anomaly
+   */
+  private Collection<MergedAnomalyResultDTO> 
enforceMaxDuration(Collection<MergedAnomalyResultDTO> anomalies) {
+    Set<MergedAnomalyResultDTO> result = new HashSet<>();
+    for (MergedAnomalyResultDTO anomaly : anomalies) {
+      if (anomaly.getEndTime() - anomaly.getStartTime() > this.maxDuration) {
+        result.addAll(splitAnomaly(anomaly, this.maxDuration));
+      } else {
+        result.add(anomaly);
+      }
+    }
+    return result;
+  }
+
+  /*
+    Split the anomaly into multiple consecutive anomalies with duration less 
than the max allowed duration.
+  */
+  private Collection<MergedAnomalyResultDTO> 
splitAnomaly(MergedAnomalyResultDTO anomaly, long maxDuration) {
+    int anomalyCountAfterSplit = (int) Math.ceil((anomaly.getEndTime() - 
anomaly.getStartTime()) / (double) maxDuration);
+    Set<MergedAnomalyResultDTO> result = new HashSet<>();
+
+    long nextStartTime = anomaly.getStartTime();
+    for (int i = 0; i < anomalyCountAfterSplit; i++) {
+      MergedAnomalyResultDTO splitedAnomaly = copyAnomalyInfo(anomaly, new 
MergedAnomalyResultDTO());
+      splitedAnomaly.setStartTime(nextStartTime);
+      splitedAnomaly.setEndTime(Math.min(nextStartTime + maxDuration, 
anomaly.getEndTime()));
+      nextStartTime = splitedAnomaly.getEndTime();
+      result.add(splitedAnomaly);
+    }
+    return result;
+  }
+
+
   protected long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) {
     long time = this.startTime;
     for (MergedAnomalyResultDTO anomaly : anomalies) {
@@ -203,6 +238,25 @@ public class MergeWrapper extends DetectionPipeline {
     return time;
   }
 
+
+  protected MergedAnomalyResultDTO copyAnomalyInfo(MergedAnomalyResultDTO 
anomaly, MergedAnomalyResultDTO newAnomaly) {
+    newAnomaly.setStartTime(anomaly.getStartTime());
+    newAnomaly.setEndTime(anomaly.getEndTime());
+    newAnomaly.setMetric(anomaly.getMetric());
+    newAnomaly.setMetricUrn(anomaly.getMetricUrn());
+    newAnomaly.setCollection(anomaly.getCollection());
+    newAnomaly.setDimensions(anomaly.getDimensions());
+    newAnomaly.setDetectionConfigId(anomaly.getDetectionConfigId());
+    newAnomaly.setAnomalyResultSource(anomaly.getAnomalyResultSource());
+    newAnomaly.setAvgBaselineVal(anomaly.getAvgBaselineVal());
+    newAnomaly.setAvgCurrentVal(anomaly.getAvgCurrentVal());
+    newAnomaly.setFeedback(anomaly.getFeedback());
+    newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId());
+    newAnomaly.setScore(anomaly.getScore());
+    newAnomaly.setWeight(anomaly.getWeight());
+    return newAnomaly;
+  }
+
   protected static class AnomalyKey {
     final String metric;
     final String collection;
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 4a997ec..a6ab276 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -20,16 +20,16 @@
 package org.apache.pinot.thirdeye.detection.wrapper;
 
 import com.google.common.collect.Collections2;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
 
 
 /**
@@ -98,21 +98,4 @@ public class ChildKeepingMergeWrapper extends 
BaselineFillingMergeWrapper {
     return output;
   }
 
-  private MergedAnomalyResultDTO copyAnomalyInfo(MergedAnomalyResultDTO 
anomaly, MergedAnomalyResultDTO newAnomaly) {
-    newAnomaly.setStartTime(anomaly.getStartTime());
-    newAnomaly.setEndTime(anomaly.getEndTime());
-    newAnomaly.setMetric(anomaly.getMetric());
-    newAnomaly.setMetricUrn(anomaly.getMetricUrn());
-    newAnomaly.setCollection(anomaly.getCollection());
-    newAnomaly.setDimensions(anomaly.getDimensions());
-    newAnomaly.setDetectionConfigId(anomaly.getDetectionConfigId());
-    newAnomaly.setAnomalyResultSource(anomaly.getAnomalyResultSource());
-    newAnomaly.setAvgBaselineVal(anomaly.getAvgBaselineVal());
-    newAnomaly.setAvgCurrentVal(anomaly.getAvgCurrentVal());
-    newAnomaly.setFeedback(anomaly.getFeedback());
-    newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId());
-    newAnomaly.setScore(anomaly.getScore());
-    newAnomaly.setWeight(anomaly.getWeight());
-    return newAnomaly;
-  }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
index bffc3af..f8c3357 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
@@ -16,14 +16,6 @@
 
 package org.apache.pinot.thirdeye.detection.algorithm;
 
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
-import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import org.apache.pinot.thirdeye.detection.MockPipeline;
-import org.apache.pinot.thirdeye.detection.MockPipelineLoader;
-import org.apache.pinot.thirdeye.detection.MockPipelineOutput;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,6 +24,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.MockPipeline;
+import org.apache.pinot.thirdeye.detection.MockPipelineLoader;
+import org.apache.pinot.thirdeye.detection.MockPipelineOutput;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -111,12 +111,10 @@ public class MergeWrapperTest {
 
   @Test
   public void testMergerPassthru() throws Exception {
-    this.config.getProperties().put(PROP_MAX_DURATION, 0);
-
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000);
     DetectionPipelineResult output = this.wrapper.run();
 
-    Assert.assertEquals(output.getAnomalies().size(), 6);
+    Assert.assertEquals(output.getAnomalies().size(), 5);
     Assert.assertEquals(output.getLastTimestamp(), 2900);
   }
 
@@ -204,6 +202,37 @@ public class MergeWrapperTest {
   }
 
   @Test
+  public void testMergerMaxDurationEnforce() throws Exception {
+    this.config.getProperties().put(PROP_MAX_DURATION, 500);
+
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(
+        makeAnomaly(2800, 3800),
+        makeAnomaly(3500, 3600)
+    ), 3700));
+
+    Map<String, Object> nestedProperties = new HashMap<>();
+    nestedProperties.put(PROP_CLASS_NAME, "none");
+    nestedProperties.put(PROP_METRIC_URN, "thirdeye:metric:3");
+
+    this.nestedProperties.add(nestedProperties);
+
+    this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000);
+    DetectionPipelineResult output = this.wrapper.run();
+
+    Assert.assertEquals(output.getAnomalies().size(), 8);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 500)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(500, 1000)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2900)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2900, 3400)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3400, 3800)));
+  }
+
+
+  @Test
   public void testMergerExecution() throws Exception {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000);
     this.wrapper.run();
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
index b39b865..c9f44b4 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
@@ -106,12 +106,10 @@ public class ChildKeepingMergeWrapperTest {
 
   @Test
   public void testMergerPassthru() throws Exception {
-    this.config.getProperties().put(PROP_MAX_DURATION, 0);
-
     this.wrapper = new ChildKeepingMergeWrapper(this.provider, this.config, 
1000, 3000);
     DetectionPipelineResult output = this.wrapper.run();
 
-    Assert.assertEquals(output.getAnomalies().size(), 6);
+    Assert.assertEquals(output.getAnomalies().size(), 5);
     Assert.assertEquals(output.getLastTimestamp(), 2900);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to