Repository: apex-malhar Updated Branches: refs/heads/master e4908ddc2 -> abb3900c9
APEXMALHAR-2299 TimeBasedDedupOperator. Fixing edge case bug in time bucket assignment Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c118884f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c118884f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c118884f Branch: refs/heads/master Commit: c118884ff351e06d6ad86d9fd9e2de497e1ac356 Parents: 9eadce1 Author: francisf <francis.fsf...@gmail.com> Authored: Fri Oct 14 17:03:15 2016 +0530 Committer: francisf <francis.fsf...@gmail.com> Committed: Tue Oct 18 10:14:02 2016 +0530 ---------------------------------------------------------------------- .../lib/state/managed/TimeBucketAssigner.java | 2 +- .../state/managed/TimeBucketAssignerTest.java | 21 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c118884f/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java index 435ffe2..d218b37 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java @@ -124,7 +124,7 @@ public class TimeBucketAssigner implements ManagedStateComponent } long diffFromStart = value - fixedStart; long key = diffFromStart / bucketSpanMillis; - if (value > end) { + if (value >= end) { long diffInBuckets = (value - end) / bucketSpanMillis; long move = (diffInBuckets + 1) * bucketSpanMillis; start += move; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c118884f/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java index 4ceef1f..8ca0960 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java @@ -92,4 +92,25 @@ public class TimeBucketAssignerTest Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(expiredTime)); testMeta.timeBucketAssigner.teardown(); } + + @Test + public void testTimeBucketKeyExpiry() + { + testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1)); + testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1)); + + long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis(); + testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); + + long time1 = Duration.standardSeconds(9).getMillis() + referenceTime; + Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) ); + + long time2 = Duration.standardSeconds(10).getMillis() + referenceTime; + Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time2) ); + + //Check for expiry of time1 now + Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) ); + + testMeta.timeBucketAssigner.teardown(); + } }