Repository: kafka Updated Branches: refs/heads/trunk 4e1c7d844 -> cf8f4a713
KAFKA-4153: Fix incorrect KStream-KStream join behavior with asymmetric time window The contribution is my original work and I license the work to the project under the project's open source license. guozhangwang Author: Elias Levy <fearsome.lucid...@gmail.com> Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang Closes #1846 from eliaslevy/KAFKA-4153 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cf8f4a71 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cf8f4a71 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cf8f4a71 Branch: refs/heads/trunk Commit: cf8f4a713b64f010c15a4a7b8dae616edb8a1b74 Parents: 4e1c7d8 Author: Elias Levy <fearsome.lucid...@gmail.com> Authored: Mon Sep 19 17:13:47 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Mon Sep 19 17:13:47 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/JoinWindows.java | 2 +- .../streams/kstream/internals/KStreamImpl.java | 2 +- .../internals/KStreamKStreamJoinTest.java | 217 +++++++++++++++++++ 3 files changed, 219 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cf8f4a71/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 1ac606e..9317743 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -31,7 +31,7 @@ import java.util.Map; * WHERE * stream1.key = stream2.key * AND - * stream2.ts - before <= stream1.ts <= stream2.ts + after + * stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after * </pre> * There are three different window configuration supported: * <ul> http://git-wip-us.apache.org/repos/asf/kafka/blob/cf8f4a71/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index b9ed19a..bf345e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -690,8 +690,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V joiner, outer); KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), - windows.before, windows.after, + windows.before, reverseJoiner(joiner), outer); http://git-wip-us.apache.org/repos/asf/kafka/blob/cf8f4a71/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 596d246..b504b8a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -488,6 +488,223 @@ public class KStreamKStreamJoinTest { processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); } + @Test + public void testAsymetricWindowingAfter() throws Exception { + long time = 1000L; + + KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; + + processor = new MockProcessorSupplier<>(); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + + joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(0).after(100), intSerde, stringSerde, stringSerde); + joined.process(processor); + + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + driver = new KStreamTestDriver(builder, stateDir); + + for (int i = 0; i < expectedKeys.length; i++) { + setRecordContext(time + i, topic1); + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + processor.checkAndClearProcessResult(); + + + time = 1000L - 1L; + setRecordContext(time, topic2); + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + time = 1000 + 100L; + setRecordContext(time, topic2); + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("3:X3+YY3"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + } + + @Test + public void testAsymetricWindowingBefore() throws Exception { + long time = 1000L; + + KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; + + processor = new MockProcessorSupplier<>(); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + + joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(0).before(100), intSerde, stringSerde, stringSerde); + joined.process(processor); + + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + driver = new KStreamTestDriver(builder, stateDir); + + for (int i = 0; i < expectedKeys.length; i++) { + setRecordContext(time + i, topic1); + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + processor.checkAndClearProcessResult(); + + + time = 1000L - 100L - 1L; + + setRecordContext(time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + time = 1000L; + + setRecordContext(time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("3:X3+YY3"); + + setRecordContext(++time, topic2); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + } + private void setRecordContext(final long time, final String topic) { ((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic)); }