gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1592924655
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -187,17 +157,25 @@ public void process(final Record<K, V1> record) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } else { sharedTimeTracker.updatedMinTime(inputRecordTimestamp); - outerJoinStore.ifPresent(store -> store.put( - TimestampedKeyAndJoinSide.make(isLeftSide, record.key(), inputRecordTimestamp), - LeftOrRightValue.make(isLeftSide, record.value()))); + putInOuterJoinStore(record, inputRecordTimestamp); Review Comment: This can just take `record` and call `record.timestamp()` itself. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -221,41 +199,28 @@ private void emitNonJoinedOuterRecords( // reset to MAX_VALUE in case the store is empty sharedTimeTracker.minTime = Long.MAX_VALUE; - try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) { + try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VL, VR>> it = store.all()) { TimestampedKeyAndJoinSide<K> prevKey = null; - boolean outerJoinLeftWindowOpen = false; - boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { - if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { - // if windows are open for both joinSides we can break since there are no more candidates to emit + final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VL, VR>> nextKeyValue = it.next(); + final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = nextKeyValue.key; + sharedTimeTracker.minTime = timestampedKeyAndJoinSide.getTimestamp(); + if (isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, true) && isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, false)) { Review Comment: Can this condition ever fire? If timestampedKeyAndJoinSide.isLeftSide() is true, then only the first condition can be true. If timestampedkeyAndJoinSide.isLeftSide() is false, then only the second condition can be true. These were state variables shared across multiple iterations and incorporated multiple timestampedKeyAndJoinSide before, now it's a function of just a single timestampedKeyAndJoinSide. Without knowing the context here, I would guess that would change the behavior. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -275,22 +240,34 @@ private void emitNonJoinedOuterRecords( } } - @SuppressWarnings("unchecked") - private VOut getNullJoinedValue( - final K key, - final LeftOrRightValue<V1, V2> leftOrRightValue) { - // depending on the JoinSide fill in the joiner key and joiner values - if (isLeftSide) { - return joiner.apply(key, - leftOrRightValue.getLeftValue(), - leftOrRightValue.getRightValue()); - } else { - return joiner.apply(key, - (V1) leftOrRightValue.getRightValue(), - (V2) leftOrRightValue.getLeftValue()); + private boolean isOuterJoinWindowOpenForSide(final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide, final boolean isLeftSide) { + if (isOuterJoinWindowOpen(timestampedKeyAndJoinSide)) { + // there are no more candidates to emit on left-outerJoin-side + return timestampedKeyAndJoinSide.isLeftSide() == isLeftSide; } + return false; + } + + private void forwardNonJoinedOuterRecords(final Record<K, ?> record, final KeyValue<? extends TimestampedKeyAndJoinSide<K>, ? extends LeftOrRightValue<VL, VR>> nextKeyValue) { + final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = nextKeyValue.key; + final K key = timestampedKeyAndJoinSide.getKey(); + final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); + final LeftOrRightValue<VL, VR> leftOrRightValue = nextKeyValue.value; + final VThis thisValue = getThisValue(leftOrRightValue); + final VOther otherValue = getOtherValue(leftOrRightValue); + final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue); + context().forward( + record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) + ); + } + + private boolean isOuterJoinWindowOpen(final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) { + final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); + return sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs + >= sharedTimeTracker.streamTime; } + Review Comment: nit: extra line ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -144,26 +129,11 @@ public void process(final Record<K, V1> record) { outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record)); } - boolean needOuterJoin = outer; - try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) { - while (iter.hasNext()) { - needOuterJoin = false; - final KeyValue<Long, V2> otherRecord = iter.next(); - final long otherRecordTimestamp = otherRecord.key; - - outerJoinStore.ifPresent(store -> { - // use putIfAbsent to first read and see if there's any values for the key, - // if yes delete the key, otherwise do not issue a put; - // we may delete some values with the same key early but since we are going - // range over all values of the same key even after failure, since the other window-store - // is only cleaned up by stream time, so this is okay for at-least-once. - store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), otherRecordTimestamp), null); - }); - - context().forward( - record.withValue(joiner.apply(record.key(), record.value(), otherRecord.value)) - .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); - } + final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); + final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); + try (final WindowStoreIterator<VOther> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) { + final boolean needOuterJoin = outer && !iter.hasNext(); Review Comment: I think this makes sense to make this a final variable, good find! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org