This is an automated email from the ASF dual-hosted git repository. xhsun pushed a commit to branch update_merge_logic in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 48969567d08c706a66c7bcbea5365bf32300cd74 Author: Xiaohui Sun <xh...@xhsun-mn3.linkedin.biz> AuthorDate: Fri May 10 14:25:00 2019 -0700 [TE] Update anomaly merge logic 1. Don't output existing anomalies that not get merged. 2. Merge new anomaly's properties into existing anomaly. 3. Do not split existing anomalies. --- .../thirdeye/detection/algorithm/MergeWrapper.java | 73 +++++++++--- .../thirdeye/detection/DetectionTestUtils.java | 6 + .../detection/algorithm/MergeWrapperTest.java | 125 ++++++++++++--------- 3 files changed, 137 insertions(+), 67 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java index 5acf958..fc64501 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.collections.MapUtils; import org.apache.pinot.thirdeye.common.dimension.DimensionMap; import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; @@ -146,11 +147,18 @@ public class MergeWrapper extends DetectionPipeline { return new ArrayList<>(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), this.config.getId()).get(effectiveSlice)); } - // logic to do time-based merging. + private boolean isExistingAnomaly(MergedAnomalyResultDTO anomaly) { + return anomaly.getId() != null; + } + + // Merge new anomalies into existing anomalies. Return the anomalies that need to update or add. + // If it is existing anomaly and not updated then it is not returned. protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) { List<MergedAnomalyResultDTO> input = new ArrayList<>(enforceMaxDuration(anomalies)); Collections.sort(input, COMPARATOR); + // stores all the existing anomalies that need to modified + Set<Long> modifiedExistingIds = new HashSet<>(); List<MergedAnomalyResultDTO> output = new ArrayList<>(); Map<AnomalyKey, MergedAnomalyResultDTO> parents = new HashMap<>(); @@ -163,16 +171,38 @@ public class MergeWrapper extends DetectionPipeline { MergedAnomalyResultDTO parent = parents.get(key); if (parent == null || anomaly.getStartTime() - parent.getEndTime() > this.maxGap) { - // no parent, too far away + // no parent, too far away to merge + // parent |-------------| + // anomaly |---------------| + // parents.put(key, anomaly); - output.add(anomaly); - + if (!isExistingAnomaly(anomaly)) { + output.add(anomaly); + } } else if (anomaly.getEndTime() <= parent.getEndTime() || anomaly.getEndTime() - parent.getStartTime() <= this.maxDuration) { - // fully merge into existing + // fully cover + // parent |-------------------| + // anomaly |-------------| + // or mergeable + // parent |-------------------| + // anomaly |-------------| + // or small gap + // parent |-------------------| + // anomaly |-------------| + // parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime())); - + // merge the anomaly's properties into parent + Map<String, String> properties = parent.getProperties(); + properties.putAll(anomaly.getProperties()); + parent.setProperties(properties); + if (isExistingAnomaly(parent)) { + modifiedExistingIds.add(parent.getId()); + } } else if (parent.getEndTime() >= anomaly.getStartTime()) { - // partially merge, truncate new + // mergeable but exceeds maxDuration, then truncate + // parent |---------------------| + // anomaly |------------------------| + // long truncationTimestamp = Math.max(parent.getEndTime(), parent.getStartTime() + this.maxDuration); parent.setEndTime(truncationTimestamp); @@ -180,29 +210,42 @@ public class MergeWrapper extends DetectionPipeline { anomaly.setEndTime(Math.max(truncationTimestamp, anomaly.getEndTime())); parents.put(key, anomaly); - output.add(anomaly); - + if (!isExistingAnomaly(anomaly)) { + output.add(anomaly); + } + if (isExistingAnomaly(parent)) { + modifiedExistingIds.add(parent.getId()); + } } else { // default to new parent if merge not possible parents.put(key, anomaly); - output.add(anomaly); - + if (!isExistingAnomaly(anomaly)) { + output.add(anomaly); + } } } - return output; + // add modified existing anomalies into output + output.addAll(input.stream().filter(x -> x.getId()!= null && modifiedExistingIds.contains(x.getId())).collect(Collectors.toList())); + + return new ArrayList<>(output); } /* Make sure that the anomalies generated from detector is shorter than maxDuration. Otherwise, split the anomaly + Do not split anomaly if it is existing anomaly. */ private Collection<MergedAnomalyResultDTO> enforceMaxDuration(Collection<MergedAnomalyResultDTO> anomalies) { Set<MergedAnomalyResultDTO> result = new HashSet<>(); for (MergedAnomalyResultDTO anomaly : anomalies) { - if (anomaly.getEndTime() - anomaly.getStartTime() > this.maxDuration) { - result.addAll(splitAnomaly(anomaly, this.maxDuration)); - } else { + if (isExistingAnomaly(anomaly)) { result.add(anomaly); + } else { + if (anomaly.getEndTime() - anomaly.getStartTime() > this.maxDuration) { + result.addAll(splitAnomaly(anomaly, this.maxDuration)); + } else { + result.add(anomaly); + } } } return result; diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java index 2378dad..6ea9c96 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java @@ -49,6 +49,12 @@ public class DetectionTestUtils { return DetectionTestUtils.makeAnomaly(configId, null, start, end, metric, dataset, dimensions); } + public static MergedAnomalyResultDTO makeAnomaly(long start, long end, long id) { + MergedAnomalyResultDTO anomalyResultDTO = makeAnomaly(start, end); + anomalyResultDTO.setId(id); + return anomalyResultDTO; + } + public static MergedAnomalyResultDTO makeAnomaly(long start, long end) { return DetectionTestUtils.makeAnomaly(PROP_ID_VALUE, start, end, null, null, Collections.<String, String>emptyMap()); } diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java index 1f679a0..29f08e4 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java @@ -59,6 +59,18 @@ public class MergeWrapperTest { private static final String PROP_MAX_GAP = "maxGap"; private static final String PROP_MAX_DURATION = "maxDuration"; + /* + Here are the anomalies in the test. + + Existing anomalies: + 0 1000 1500 2000 + |-----------------| |----------| + New anomalies: + 1100 1200 2200 2300 + |-----| |-----| + 1150 1250 2400 2800 + |-----| |------------| + */ @BeforeMethod public void beforeMethod() { this.runs = new ArrayList<>(); @@ -87,26 +99,19 @@ public class MergeWrapperTest { this.config.setProperties(this.properties); List<MergedAnomalyResultDTO> existing = new ArrayList<>(); - existing.add(makeAnomaly(0, 1000)); - existing.add(makeAnomaly(1500, 2000)); + // For existing anomalies add ids. + existing.add(makeAnomaly(0, 1000, 0)); + existing.add(makeAnomaly(1500, 2000, 1)); this.outputs = new ArrayList<>(); - this.outputs.add(new MockPipelineOutput(Arrays.asList( - makeAnomaly(1100, 1200), - makeAnomaly(2200, 2300) - ), 2900)); + this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1100, 1200), makeAnomaly(2200, 2300)), 2900)); - this.outputs.add(new MockPipelineOutput(Arrays.asList( - makeAnomaly(1150, 1250), - makeAnomaly(2400, 2800) - ), 3000)); + this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1150, 1250), makeAnomaly(2400, 2800)), 3000)); this.mockLoader = new MockPipelineLoader(this.runs, this.outputs); - this.provider = new MockDataProvider() - .setAnomalies(existing) - .setLoader(this.mockLoader); + this.provider = new MockDataProvider().setAnomalies(existing).setLoader(this.mockLoader); } @Test @@ -115,7 +120,10 @@ public class MergeWrapperTest { this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000); DetectionPipelineResult output = this.wrapper.run(); - Assert.assertEquals(output.getAnomalies().size(), 5); + Assert.assertEquals(output.getAnomalies().size(), 3); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800))); Assert.assertEquals(output.getLastTimestamp(), 3000); } @@ -126,10 +134,12 @@ public class MergeWrapperTest { this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000); DetectionPipelineResult output = this.wrapper.run(); - Assert.assertEquals(output.getAnomalies().size(), 3); + // anomaly [1500, 2000] is not modified + Assert.assertEquals(output.getAnomalies().size(), 2); Assert.assertEquals(output.getLastTimestamp(), 3000); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250))); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000))); + // anomalies [1100, 1200] and [1150,1250] are merged into [0, 1000] + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0))); + // anomalies [2200, 2300] and [2400, 2800] are merged Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2800))); } @@ -143,8 +153,8 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 3); Assert.assertEquals(output.getLastTimestamp(), 3000); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250))); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 1))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800))); } @@ -153,10 +163,7 @@ public class MergeWrapperTest { this.config.getProperties().put(PROP_MAX_GAP, 200); this.config.getProperties().put(PROP_MAX_DURATION, 1250); - this.outputs.add(new MockPipelineOutput(Arrays.asList( - makeAnomaly(2800, 3700), - makeAnomaly(3700, 3800) - ), 3700)); + this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(2800, 3700), makeAnomaly(3700, 3800)), 3700)); Map<String, Object> nestedProperties = new HashMap<>(); nestedProperties.put(PROP_CLASS_NAME, "none"); @@ -169,8 +176,8 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 4); Assert.assertEquals(output.getLastTimestamp(), 3700); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250))); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 1))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800))); } @@ -180,10 +187,7 @@ public class MergeWrapperTest { this.config.getProperties().put(PROP_MAX_GAP, 200); this.config.getProperties().put(PROP_MAX_DURATION, 1250); - this.outputs.add(new MockPipelineOutput(Arrays.asList( - makeAnomaly(2800, 3800), - makeAnomaly(3500, 3600) - ), 3700)); + this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(2800, 3800), makeAnomaly(3500, 3600)), 3700)); Map<String, Object> nestedProperties = new HashMap<>(); nestedProperties.put(PROP_CLASS_NAME, "none"); @@ -196,8 +200,8 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 4); Assert.assertEquals(output.getLastTimestamp(), 3700); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250))); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 1))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800))); } @@ -206,10 +210,7 @@ public class MergeWrapperTest { public void testMergerMaxDurationEnforce() throws Exception { this.config.getProperties().put(PROP_MAX_DURATION, 500); - this.outputs.add(new MockPipelineOutput(Arrays.asList( - makeAnomaly(2800, 3800), - makeAnomaly(3500, 3600) - ), 3700)); + this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(2800, 3800), makeAnomaly(3500, 3600)), 3700)); Map<String, Object> nestedProperties = new HashMap<>(); nestedProperties.put(PROP_CLASS_NAME, "none"); @@ -220,19 +221,15 @@ public class MergeWrapperTest { this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000); DetectionPipelineResult output = this.wrapper.run(); - Assert.assertEquals(output.getAnomalies().size(), 8); + Assert.assertEquals(output.getAnomalies().size(), 5); Assert.assertEquals(output.getLastTimestamp(), 3700); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 500))); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(500, 1000))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250))); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2900))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2900, 3400))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3400, 3800))); } - @Test public void testMergerExecution() throws Exception { this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000); @@ -263,15 +260,13 @@ public class MergeWrapperTest { this.config.getProperties().put(PROP_MAX_GAP, 200); this.config.getProperties().put(PROP_MAX_DURATION, 1250); - this.outputs.add(new MockPipelineOutput(Arrays.asList( - makeAnomaly(1150, 1250, Collections.singletonMap("key", "value")), - makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value")) - ), 3000)); + this.outputs.add(new MockPipelineOutput( + Arrays.asList(makeAnomaly(1150, 1250, Collections.singletonMap("key", "value")), + makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value"))), 3000)); - this.outputs.add(new MockPipelineOutput(Arrays.asList( - makeAnomaly(1250, 1300, Collections.singletonMap("key", "value")), - makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue")) - ), 3000)); + this.outputs.add(new MockPipelineOutput( + Arrays.asList(makeAnomaly(1250, 1300, Collections.singletonMap("key", "value")), + makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue"))), 3000)); Map<String, Object> nestedPropertiesThree = new HashMap<>(); nestedPropertiesThree.put(PROP_CLASS_NAME, "none"); @@ -289,12 +284,38 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 6); Assert.assertEquals(output.getLastTimestamp(), 3000); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250))); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 1))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1150, 1300, Collections.singletonMap("key", "value")))); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value")))); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue")))); + Assert.assertTrue( + output.getAnomalies().contains(makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value")))); + Assert.assertTrue( + output.getAnomalies().contains(makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue")))); } + @Test + public void testMergeProperties() throws Exception { + + MergedAnomalyResultDTO anomaly = makeAnomaly(1100, 1250); + String propertyKey = "trend_day1"; + String propertyValue = "{trend_info}"; + anomaly.setProperties(Collections.singletonMap(propertyKey, propertyValue)); + + this.outputs.add(new MockPipelineOutput(Arrays.asList(anomaly), 3700)); + + Map<String, Object> nestedProperties = new HashMap<>(); + nestedProperties.put(PROP_CLASS_NAME, "none"); + nestedProperties.put(PROP_METRIC_URN, "thirdeye:metric:1"); + + this.nestedProperties.add(nestedProperties); + + this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000); + DetectionPipelineResult output = this.wrapper.run(); + + Assert.assertEquals(output.getAnomalies().size(), 1); + Assert.assertEquals(output.getLastTimestamp(), 3700); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 2800, 0))); + Assert.assertTrue(output.getAnomalies().get(0).getProperties().get(propertyKey).equals(propertyValue)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org