Repository: apex-malhar
Updated Branches:
  refs/heads/master e4908ddc2 -> abb3900c9


APEXMALHAR-2299 TimeBasedDedupOperator.  Fixing edge case bug in time bucket 
assignment


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c118884f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c118884f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c118884f

Branch: refs/heads/master
Commit: c118884ff351e06d6ad86d9fd9e2de497e1ac356
Parents: 9eadce1
Author: francisf <francis.fsf...@gmail.com>
Authored: Fri Oct 14 17:03:15 2016 +0530
Committer: francisf <francis.fsf...@gmail.com>
Committed: Tue Oct 18 10:14:02 2016 +0530

----------------------------------------------------------------------
 .../lib/state/managed/TimeBucketAssigner.java   |  2 +-
 .../state/managed/TimeBucketAssignerTest.java   | 21 ++++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c118884f/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
index 435ffe2..d218b37 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
@@ -124,7 +124,7 @@ public class TimeBucketAssigner implements 
ManagedStateComponent
     }
     long diffFromStart = value - fixedStart;
     long key = diffFromStart / bucketSpanMillis;
-    if (value > end) {
+    if (value >= end) {
       long diffInBuckets = (value - end) / bucketSpanMillis;
       long move = (diffInBuckets + 1) * bucketSpanMillis;
       start += move;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c118884f/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
index 4ceef1f..8ca0960 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java
@@ -92,4 +92,25 @@ public class TimeBucketAssignerTest
     Assert.assertEquals("time bucket", -1, 
testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(expiredTime));
     testMeta.timeBucketAssigner.teardown();
   }
+
+  @Test
+  public void testTimeBucketKeyExpiry()
+  {
+    testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
+    testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
+
+    long referenceTime = 
testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
+    testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+
+    long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
+    Assert.assertEquals("time bucket", 10, 
testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) );
+
+    long time2 = Duration.standardSeconds(10).getMillis()  + referenceTime;
+    Assert.assertEquals("time bucket", 11, 
testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time2) );
+
+    //Check for expiry of time1 now
+    Assert.assertEquals("time bucket", -1, 
testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) );
+
+    testMeta.timeBucketAssigner.teardown();
+  }
 }

Reply via email to