gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1592924655


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -187,17 +157,25 @@ public void process(final Record<K, V1> record) {
                         
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
                     } else {
                         sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
-                        outerJoinStore.ifPresent(store -> store.put(
-                            TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-                            LeftOrRightValue.make(isLeftSide, 
record.value())));
+                        putInOuterJoinStore(record, inputRecordTimestamp);

Review Comment:
   This can just take `record` and call `record.timestamp()` itself.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -221,41 +199,28 @@ private void emitNonJoinedOuterRecords(
             // reset to MAX_VALUE in case the store is empty
             sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-            try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> it = store.all()) {
+            try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VL, VR>> it = store.all()) {
                 TimestampedKeyAndJoinSide<K> prevKey = null;
 
-                boolean outerJoinLeftWindowOpen = false;
-                boolean outerJoinRightWindowOpen = false;
                 while (it.hasNext()) {
-                    if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
-                        // if windows are open for both joinSides we can break 
since there are no more candidates to emit
+                    final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VL, VR>> nextKeyValue = it.next();
+                    final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = nextKeyValue.key;
+                    sharedTimeTracker.minTime = 
timestampedKeyAndJoinSide.getTimestamp();
+                    if 
(isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, true) && 
isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, false)) {

Review Comment:
   Can this condition ever fire?
   If timestampedKeyAndJoinSide.isLeftSide() is true, then only the first 
condition can be true.
   If timestampedkeyAndJoinSide.isLeftSide() is false, then only the second 
condition can be true.
   
   These were state variables shared across multiple iterations and 
incorporated multiple timestampedKeyAndJoinSide before, now it's a function of 
just a single timestampedKeyAndJoinSide. Without knowing the context here, I 
would guess that would change the behavior.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -275,22 +240,34 @@ private void emitNonJoinedOuterRecords(
             }
         }
 
-        @SuppressWarnings("unchecked")
-        private VOut getNullJoinedValue(
-            final K key, 
-            final LeftOrRightValue<V1, V2> leftOrRightValue) {
-            // depending on the JoinSide fill in the joiner key and joiner 
values
-            if (isLeftSide) {
-                return joiner.apply(key,
-                        leftOrRightValue.getLeftValue(),
-                        leftOrRightValue.getRightValue());
-            } else {
-                return joiner.apply(key,
-                        (V1) leftOrRightValue.getRightValue(),
-                        (V2) leftOrRightValue.getLeftValue());
+        private boolean isOuterJoinWindowOpenForSide(final 
TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide, final boolean 
isLeftSide) {
+            if (isOuterJoinWindowOpen(timestampedKeyAndJoinSide)) {
+                // there are no more candidates to emit on left-outerJoin-side
+                return timestampedKeyAndJoinSide.isLeftSide() == isLeftSide;
             }
+            return false;
+        }
+
+        private void forwardNonJoinedOuterRecords(final Record<K, ?> record, 
final KeyValue<? extends TimestampedKeyAndJoinSide<K>, ? extends 
LeftOrRightValue<VL, VR>> nextKeyValue) {
+            final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = 
nextKeyValue.key;
+            final K key = timestampedKeyAndJoinSide.getKey();
+            final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
+            final LeftOrRightValue<VL, VR> leftOrRightValue = 
nextKeyValue.value;
+            final VThis thisValue = getThisValue(leftOrRightValue);
+            final VOther otherValue = getOtherValue(leftOrRightValue);
+            final VOut nullJoinedValue = joiner.apply(key, thisValue, 
otherValue);
+            context().forward(
+                    
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
+            );
+        }
+
+        private boolean isOuterJoinWindowOpen(final 
TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
+            final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+            return sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs
+                    >= sharedTimeTracker.streamTime;
         }
 
+

Review Comment:
   nit: extra line



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -144,26 +129,11 @@ public void process(final Record<K, V1> record) {
                 outerJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, record));
             }
 
-            boolean needOuterJoin = outer;
-            try (final WindowStoreIterator<V2> iter = 
otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
-                while (iter.hasNext()) {
-                    needOuterJoin = false;
-                    final KeyValue<Long, V2> otherRecord = iter.next();
-                    final long otherRecordTimestamp = otherRecord.key;
-
-                    outerJoinStore.ifPresent(store -> {
-                        // use putIfAbsent to first read and see if there's 
any values for the key,
-                        // if yes delete the key, otherwise do not issue a put;
-                        // we may delete some values with the same key early 
but since we are going
-                        // range over all values of the same key even after 
failure, since the other window-store
-                        // is only cleaned up by stream time, so this is okay 
for at-least-once.
-                        
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), 
otherRecordTimestamp), null);
-                    });
-
-                    context().forward(
-                        record.withValue(joiner.apply(record.key(), 
record.value(), otherRecord.value))
-                               .withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
-                }
+            final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
+            final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
+            try (final WindowStoreIterator<VOther> iter = 
otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
+                final boolean needOuterJoin = outer && !iter.hasNext();

Review Comment:
   I think this makes sense to make this a final variable, good find!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to