guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616218643



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +83,44 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> 
recordWindowHasClosed =

Review comment:
       nit : I think this function can just be inlined now?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, 
LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = leftOuter ? 
"-shared-left-outer-join" : "-shared-outer-join";

Review comment:
       I think @mjsax 's comment about naming is just to align with the join 
type, and note that "leftOuter" would always be true once we passed `if 
(leftOuter || rightOuter)` since "leftOuter == false && rightOuter == true" is 
not a case. Also to align with the terms "-outer-this-join" / "-this-join", I 
think it would be:
   
   rightOuter ? "-outer-shared-join" : "-left-shared-join";

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link 
KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in 
the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains 
either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right 
topic.
+ */
+public class LeftOrRightValue<V1, V2> {
+    private final V1 leftValue;
+    private final V2 rightValue;
+
+    private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+        if (leftValue != null && rightValue != null) {

Review comment:
       Hmm.. I thought it could be possible that both sides are null? e.g. for 
a left-value, where the value itself is `null` (we do not filter null values in 
a stream at the moment right? @mjsax ).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep 
time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes 
records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {

Review comment:
       I think the motivation is that if the current record's timestamp is too 
small (i.e. it is too late), then it should not be added into the book-keeping 
store but can be "expired" immediately. But I also feel the condition seems a 
bit off here: for the record to be "too late", its timestamp just need to be 
smaller than the expiration boundary, which is observed-stream-time - 
join-after - grace-period, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +83,44 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> 
recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) 
context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (internalOuterJoinFixEnabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixEnabled(final Map<String, Object> 
configs) {

Review comment:
       See my other comment above: I think we should move this to the 
KStreamImplJoin to decide whether or not creating the store builder, and here 
we can just rely on whether the passed in outerJoinWindowName is empty or not 
to get the store (if it is not empty, then the store must have been created).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, 
LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {

Review comment:
       If the `INTERNAL_ENABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX` config is set 
to false, we would end up creating a store but not use it at all.
   
   So I think we need to also check this config here during the plan generation 
as well: if it is false, then leaving the storeBuilder as empty optional.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +83,44 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> 
recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) 
context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);

Review comment:
       nit: not introduced by this PR, but let's rename it to 
`otherWindowStore` for naming consistency.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##########
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.Assert.assertEquals;
+
+public class KStreamKStreamOuterJoinTest {

Review comment:
       Thanks for the added tests! I'm wondering if we can add a case for both 
left/outer where input streams have consecutive records with exact key / 
timestamps --- i.e. they are "duplicates" --- and check that 1) we do not 
de-duplicate them, 2) when a join is not found in time, we expire / delete them 
together, with two emitted results, 3) when a join is found in time, we emitted 
two join results.
   
   Also could we have a test case that, when a single record both causes 
expiration and join results, the emitted records are still sorted in order.
   
   I raised them only because I have yet found these two cases in the test 
cases, if there's already coverage please ignore this comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to