This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch update_merge_logic
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 48969567d08c706a66c7bcbea5365bf32300cd74
Author: Xiaohui Sun <xh...@xhsun-mn3.linkedin.biz>
AuthorDate: Fri May 10 14:25:00 2019 -0700

    [TE] Update anomaly merge logic
    1. Don't output existing anomalies that not get merged.
    2. Merge new anomaly's properties into existing anomaly.
    3. Do not split existing anomalies.
---
 .../thirdeye/detection/algorithm/MergeWrapper.java |  73 +++++++++---
 .../thirdeye/detection/DetectionTestUtils.java     |   6 +
 .../detection/algorithm/MergeWrapperTest.java      | 125 ++++++++++++---------
 3 files changed, 137 insertions(+), 67 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index 5acf958..fc64501 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.collections.MapUtils;
 import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
@@ -146,11 +147,18 @@ public class MergeWrapper extends DetectionPipeline {
     return new 
ArrayList<>(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), 
this.config.getId()).get(effectiveSlice));
   }
 
-  // logic to do time-based merging.
+  private boolean isExistingAnomaly(MergedAnomalyResultDTO anomaly) {
+    return anomaly.getId() != null;
+  }
+
+  // Merge new anomalies into existing anomalies. Return the anomalies that 
need to update or add.
+  // If it is existing anomaly and not updated then it is not returned.
   protected List<MergedAnomalyResultDTO> 
merge(Collection<MergedAnomalyResultDTO> anomalies) {
     List<MergedAnomalyResultDTO> input = new 
ArrayList<>(enforceMaxDuration(anomalies));
     Collections.sort(input, COMPARATOR);
 
+    // stores all the existing anomalies that need to modified
+    Set<Long> modifiedExistingIds = new HashSet<>();
     List<MergedAnomalyResultDTO> output = new ArrayList<>();
 
     Map<AnomalyKey, MergedAnomalyResultDTO> parents = new HashMap<>();
@@ -163,16 +171,38 @@ public class MergeWrapper extends DetectionPipeline {
       MergedAnomalyResultDTO parent = parents.get(key);
 
       if (parent == null || anomaly.getStartTime() - parent.getEndTime() > 
this.maxGap) {
-        // no parent, too far away
+        // no parent, too far away to merge
+        //      parent |-------------|
+        //                                  anomaly |---------------|
+        //
         parents.put(key, anomaly);
-        output.add(anomaly);
-
+        if (!isExistingAnomaly(anomaly)) {
+          output.add(anomaly);
+        }
       } else if (anomaly.getEndTime() <= parent.getEndTime() || 
anomaly.getEndTime() - parent.getStartTime() <= this.maxDuration) {
-        // fully merge into existing
+        // fully cover
+        //      parent |-------------------|
+        //              anomaly |-------------|
+        // or mergeable
+        //      parent |-------------------|
+        //                      anomaly |-------------|
+        // or small gap
+        //      parent |-------------------|
+        //                                 anomaly |-------------|
+        //
         parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime()));
-
+        // merge the anomaly's properties into parent
+        Map<String, String> properties = parent.getProperties();
+        properties.putAll(anomaly.getProperties());
+        parent.setProperties(properties);
+        if (isExistingAnomaly(parent)) {
+          modifiedExistingIds.add(parent.getId());
+        }
       } else if (parent.getEndTime() >= anomaly.getStartTime()) {
-        // partially merge, truncate new
+        // mergeable but exceeds maxDuration, then truncate
+        //      parent |---------------------|
+        //                        anomaly |------------------------|
+        //
         long truncationTimestamp = Math.max(parent.getEndTime(), 
parent.getStartTime() + this.maxDuration);
 
         parent.setEndTime(truncationTimestamp);
@@ -180,29 +210,42 @@ public class MergeWrapper extends DetectionPipeline {
         anomaly.setEndTime(Math.max(truncationTimestamp, 
anomaly.getEndTime()));
 
         parents.put(key, anomaly);
-        output.add(anomaly);
-
+        if (!isExistingAnomaly(anomaly)) {
+          output.add(anomaly);
+        }
+        if (isExistingAnomaly(parent)) {
+          modifiedExistingIds.add(parent.getId());
+        }
       } else {
         // default to new parent if merge not possible
         parents.put(key, anomaly);
-        output.add(anomaly);
-
+        if (!isExistingAnomaly(anomaly)) {
+          output.add(anomaly);
+        }
       }
     }
 
-    return output;
+    // add modified existing anomalies into output
+    output.addAll(input.stream().filter(x -> x.getId()!= null && 
modifiedExistingIds.contains(x.getId())).collect(Collectors.toList()));
+
+    return new ArrayList<>(output);
   }
 
   /*
     Make sure that the anomalies generated from detector is shorter than 
maxDuration. Otherwise, split the anomaly
+    Do not split anomaly if it is existing anomaly.
    */
   private Collection<MergedAnomalyResultDTO> 
enforceMaxDuration(Collection<MergedAnomalyResultDTO> anomalies) {
     Set<MergedAnomalyResultDTO> result = new HashSet<>();
     for (MergedAnomalyResultDTO anomaly : anomalies) {
-      if (anomaly.getEndTime() - anomaly.getStartTime() > this.maxDuration) {
-        result.addAll(splitAnomaly(anomaly, this.maxDuration));
-      } else {
+      if (isExistingAnomaly(anomaly)) {
         result.add(anomaly);
+      } else {
+        if (anomaly.getEndTime() - anomaly.getStartTime() > this.maxDuration) {
+          result.addAll(splitAnomaly(anomaly, this.maxDuration));
+        } else {
+          result.add(anomaly);
+        }
       }
     }
     return result;
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
index 2378dad..6ea9c96 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
@@ -49,6 +49,12 @@ public class DetectionTestUtils {
     return DetectionTestUtils.makeAnomaly(configId, null, start, end, metric, 
dataset, dimensions);
   }
 
+  public static MergedAnomalyResultDTO makeAnomaly(long start, long end, long 
id) {
+    MergedAnomalyResultDTO anomalyResultDTO = makeAnomaly(start, end);
+    anomalyResultDTO.setId(id);
+    return anomalyResultDTO;
+  }
+
   public static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
     return DetectionTestUtils.makeAnomaly(PROP_ID_VALUE, start, end, null, 
null, Collections.<String, String>emptyMap());
   }
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
index 1f679a0..29f08e4 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
@@ -59,6 +59,18 @@ public class MergeWrapperTest {
   private static final String PROP_MAX_GAP = "maxGap";
   private static final String PROP_MAX_DURATION = "maxDuration";
 
+  /*
+    Here are the anomalies in the test.
+
+    Existing anomalies:
+    0                1000             1500       2000
+    |-----------------|                |----------|
+    New  anomalies:
+                       1100  1200                     2200  2300
+                        |-----|                         |-----|
+                           1150 1250                            2400          
2800
+                            |-----|                               
|------------|
+  */
   @BeforeMethod
   public void beforeMethod() {
     this.runs = new ArrayList<>();
@@ -87,26 +99,19 @@ public class MergeWrapperTest {
     this.config.setProperties(this.properties);
 
     List<MergedAnomalyResultDTO> existing = new ArrayList<>();
-    existing.add(makeAnomaly(0, 1000));
-    existing.add(makeAnomaly(1500, 2000));
+    // For existing anomalies add ids.
+    existing.add(makeAnomaly(0, 1000, 0));
+    existing.add(makeAnomaly(1500, 2000, 1));
 
     this.outputs = new ArrayList<>();
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(1100, 1200),
-        makeAnomaly(2200, 2300)
-    ), 2900));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1100, 
1200), makeAnomaly(2200, 2300)), 2900));
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(1150, 1250),
-        makeAnomaly(2400, 2800)
-    ), 3000));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1150, 
1250), makeAnomaly(2400, 2800)), 3000));
 
     this.mockLoader = new MockPipelineLoader(this.runs, this.outputs);
 
-    this.provider = new MockDataProvider()
-        .setAnomalies(existing)
-        .setLoader(this.mockLoader);
+    this.provider = new 
MockDataProvider().setAnomalies(existing).setLoader(this.mockLoader);
   }
 
   @Test
@@ -115,7 +120,10 @@ public class MergeWrapperTest {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000);
     DetectionPipelineResult output = this.wrapper.run();
 
-    Assert.assertEquals(output.getAnomalies().size(), 5);
+    Assert.assertEquals(output.getAnomalies().size(), 3);
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
     Assert.assertEquals(output.getLastTimestamp(), 3000);
   }
 
@@ -126,10 +134,12 @@ public class MergeWrapperTest {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000);
     DetectionPipelineResult output = this.wrapper.run();
 
-    Assert.assertEquals(output.getAnomalies().size(), 3);
+    // anomaly [1500, 2000] is not modified
+    Assert.assertEquals(output.getAnomalies().size(), 2);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000)));
+    // anomalies [1100, 1200] and [1150,1250] are merged into [0, 1000]
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    // anomalies [2200, 2300] and [2400, 2800] are merged
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2800)));
   }
 
@@ -143,8 +153,8 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 3);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 
1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
   }
 
@@ -153,10 +163,7 @@ public class MergeWrapperTest {
     this.config.getProperties().put(PROP_MAX_GAP, 200);
     this.config.getProperties().put(PROP_MAX_DURATION, 1250);
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(2800, 3700),
-        makeAnomaly(3700, 3800)
-    ), 3700));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(2800, 
3700), makeAnomaly(3700, 3800)), 3700));
 
     Map<String, Object> nestedProperties = new HashMap<>();
     nestedProperties.put(PROP_CLASS_NAME, "none");
@@ -169,8 +176,8 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 
1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800)));
   }
@@ -180,10 +187,7 @@ public class MergeWrapperTest {
     this.config.getProperties().put(PROP_MAX_GAP, 200);
     this.config.getProperties().put(PROP_MAX_DURATION, 1250);
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(2800, 3800),
-        makeAnomaly(3500, 3600)
-    ), 3700));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(2800, 
3800), makeAnomaly(3500, 3600)), 3700));
 
     Map<String, Object> nestedProperties = new HashMap<>();
     nestedProperties.put(PROP_CLASS_NAME, "none");
@@ -196,8 +200,8 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 
1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800)));
   }
@@ -206,10 +210,7 @@ public class MergeWrapperTest {
   public void testMergerMaxDurationEnforce() throws Exception {
     this.config.getProperties().put(PROP_MAX_DURATION, 500);
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(2800, 3800),
-        makeAnomaly(3500, 3600)
-    ), 3700));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(2800, 
3800), makeAnomaly(3500, 3600)), 3700));
 
     Map<String, Object> nestedProperties = new HashMap<>();
     nestedProperties.put(PROP_CLASS_NAME, "none");
@@ -220,19 +221,15 @@ public class MergeWrapperTest {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000);
     DetectionPipelineResult output = this.wrapper.run();
 
-    Assert.assertEquals(output.getAnomalies().size(), 8);
+    Assert.assertEquals(output.getAnomalies().size(), 5);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 500)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(500, 1000)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2900)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2900, 3400)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3400, 3800)));
   }
 
-
   @Test
   public void testMergerExecution() throws Exception {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000);
@@ -263,15 +260,13 @@ public class MergeWrapperTest {
     this.config.getProperties().put(PROP_MAX_GAP, 200);
     this.config.getProperties().put(PROP_MAX_DURATION, 1250);
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(1150, 1250, Collections.singletonMap("key", "value")),
-        makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value"))
-    ), 3000));
+    this.outputs.add(new MockPipelineOutput(
+        Arrays.asList(makeAnomaly(1150, 1250, Collections.singletonMap("key", 
"value")),
+            makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", 
"value"))), 3000));
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(1250, 1300, Collections.singletonMap("key", "value")),
-        makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", 
"otherValue"))
-    ), 3000));
+    this.outputs.add(new MockPipelineOutput(
+        Arrays.asList(makeAnomaly(1250, 1300, Collections.singletonMap("key", 
"value")),
+            makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", 
"otherValue"))), 3000));
 
     Map<String, Object> nestedPropertiesThree = new HashMap<>();
     nestedPropertiesThree.put(PROP_CLASS_NAME, "none");
@@ -289,12 +284,38 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 6);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 
1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1150, 1300, 
Collections.singletonMap("key", "value"))));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800, 
Collections.singletonMap("otherKey", "value"))));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2700, 2900, 
Collections.singletonMap("otherKey", "otherValue"))));
+    Assert.assertTrue(
+        output.getAnomalies().contains(makeAnomaly(2400, 2800, 
Collections.singletonMap("otherKey", "value"))));
+    Assert.assertTrue(
+        output.getAnomalies().contains(makeAnomaly(2700, 2900, 
Collections.singletonMap("otherKey", "otherValue"))));
   }
 
+  @Test
+  public void testMergeProperties() throws Exception {
+
+    MergedAnomalyResultDTO anomaly = makeAnomaly(1100, 1250);
+    String propertyKey = "trend_day1";
+    String propertyValue = "{trend_info}";
+    anomaly.setProperties(Collections.singletonMap(propertyKey, 
propertyValue));
+
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(anomaly), 3700));
+
+    Map<String, Object> nestedProperties = new HashMap<>();
+    nestedProperties.put(PROP_CLASS_NAME, "none");
+    nestedProperties.put(PROP_METRIC_URN, "thirdeye:metric:1");
+
+    this.nestedProperties.add(nestedProperties);
+
+    this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000);
+    DetectionPipelineResult output = this.wrapper.run();
+
+    Assert.assertEquals(output.getAnomalies().size(), 1);
+    Assert.assertEquals(output.getLastTimestamp(), 3700);
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 2800, 0)));
+    
Assert.assertTrue(output.getAnomalies().get(0).getProperties().get(propertyKey).equals(propertyValue));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to