Repository: kafka
Updated Branches:
  refs/heads/trunk 4e1c7d844 -> cf8f4a713


KAFKA-4153: Fix incorrect KStream-KStream join behavior with asymmetric time 
window

The contribution is my original work and I license the work to the project 
under the project's open source license.

guozhangwang

Author: Elias Levy <fearsome.lucid...@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #1846 from eliaslevy/KAFKA-4153


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cf8f4a71
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cf8f4a71
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cf8f4a71

Branch: refs/heads/trunk
Commit: cf8f4a713b64f010c15a4a7b8dae616edb8a1b74
Parents: 4e1c7d8
Author: Elias Levy <fearsome.lucid...@gmail.com>
Authored: Mon Sep 19 17:13:47 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Sep 19 17:13:47 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindows.java      |   2 +-
 .../streams/kstream/internals/KStreamImpl.java  |   2 +-
 .../internals/KStreamKStreamJoinTest.java       | 217 +++++++++++++++++++
 3 files changed, 219 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cf8f4a71/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 1ac606e..9317743 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -31,7 +31,7 @@ import java.util.Map;
  *     WHERE
  *       stream1.key = stream2.key
  *       AND
- *       stream2.ts - before <= stream1.ts <= stream2.ts + after
+ *       stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
  * </pre>
  * There are three different window configuration supported:
  * <ul>

http://git-wip-us.apache.org/repos/asf/kafka/blob/cf8f4a71/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b9ed19a..bf345e1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -690,8 +690,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
                                                                                
   joiner,
                                                                                
   outer);
             KStreamKStreamJoin<K1, R, V2, V1> joinOther = new 
KStreamKStreamJoin<>(thisWindow.name(),
-                                                                               
    windows.before,
                                                                                
    windows.after,
+                                                                               
    windows.before,
                                                                                
    reverseJoiner(joiner),
                                                                                
    outer);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cf8f4a71/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 596d246..b504b8a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -488,6 +488,223 @@ public class KStreamKStreamJoinTest {
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", 
"2:XX2+Y2", "3:XX3+Y3");
     }
 
+    @Test
+    public void testAsymetricWindowingAfter() throws Exception {
+        long time = 1000L;
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+        KStream<Integer, String> stream1;
+        KStream<Integer, String> stream2;
+        KStream<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
+
+        processor = new MockProcessorSupplier<>();
+        stream1 = builder.stream(intSerde, stringSerde, topic1);
+        stream2 = builder.stream(intSerde, stringSerde, topic2);
+
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, 
JoinWindows.of(0).after(100), intSerde, stringSerde, stringSerde);
+        joined.process(processor);
+
+        Collection<Set<String>> copartitionGroups = 
builder.copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+        driver = new KStreamTestDriver(builder, stateDir);
+
+        for (int i = 0; i < expectedKeys.length; i++) {
+            setRecordContext(time + i, topic1);
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
+        processor.checkAndClearProcessResult();
+
+
+        time = 1000L - 1L;
+        setRecordContext(time, topic2);
+
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult();
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
+
+        time = 1000 + 100L;
+        setRecordContext(time, topic2);
+
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", 
"3:X3+YY3");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("3:X3+YY3");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult();
+    }
+
+    @Test
+    public void testAsymetricWindowingBefore() throws Exception {
+        long time = 1000L;
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+        KStream<Integer, String> stream1;
+        KStream<Integer, String> stream2;
+        KStream<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
+
+        processor = new MockProcessorSupplier<>();
+        stream1 = builder.stream(intSerde, stringSerde, topic1);
+        stream2 = builder.stream(intSerde, stringSerde, topic2);
+
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, 
JoinWindows.of(0).before(100), intSerde, stringSerde, stringSerde);
+        joined.process(processor);
+
+        Collection<Set<String>> copartitionGroups = 
builder.copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+        driver = new KStreamTestDriver(builder, stateDir);
+
+        for (int i = 0; i < expectedKeys.length; i++) {
+            setRecordContext(time + i, topic1);
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
+        processor.checkAndClearProcessResult();
+
+
+        time = 1000L - 100L - 1L;
+
+        setRecordContext(time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult();
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
+        time = 1000L;
+
+        setRecordContext(time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", 
"3:X3+YY3");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("3:X3+YY3");
+
+        setRecordContext(++time, topic2);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult();
+    }
+
     private void setRecordContext(final long time, final String topic) {
         ((MockProcessorContext) driver.context()).setRecordContext(new 
ProcessorRecordContext(time, 0, 0, topic));
     }

Reply via email to