spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614313362
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; + boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); + maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; + joinFound = true; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; + + // Emit expired records before the joined record to keep time ordering + emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp); + context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + } + + // Emit all expired records before adding a new non-joined record to the store. Otherwise, + // the put() call will advance the stream time, which causes records out of the retention + // period to be deleted, thus not being emitted later. + if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) { + emitExpiredNonJoinedOuterRecords(); } if (needOuterJoin) { - context().forward(key, joiner.apply(key, value, null)); + // The maxStreamTime contains the max time observed in both sides of the join. + // Having access to the time observed in the other join side fixes the following + // problem: + // + // Say we have a window size of 5 seconds + // 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not processed + // + // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests + // + // the condition below allows us to process the late record without the need + // to hold it in the temporary outer store + if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) { + context().forward(key, joiner.apply(key, value, null)); + } else { + outerJoinWindowStore.ifPresent(store -> store.put( + KeyAndJoinSide.make(isLeftJoin, key), + LeftOrRightValue.make(isLeftJoin, value), + inputRecordTimestamp)); + } + } + } + } + + private void emitExpiredNonJoinedOuterRecords() { + outerJoinWindowStore.ifPresent(store -> + emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed)); + } + + private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) { + outerJoinWindowStore.ifPresent(store -> { + final KeyAndJoinSide<K> keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key); + + // Emit all expired records except the just found non-joined key. We need + // to emit all expired records before calling put(), otherwise the internal + // stream time will advance and may cause records out of the retention period to + // be deleted. + emitExpiredNonJoinedOuterRecords(store, + recordWindowHasClosed + .and(k -> !k.key().equals(keyAndJoinSide)) + .and(k -> k.window().start() != timestamp)); + + if (store.fetch(keyAndJoinSide, timestamp) != null) { + // Delete the record. The previous emit call may not have removed this record + // if the record window has not closed. + store.put(keyAndJoinSide, null, timestamp); + } + }); + } + + @SuppressWarnings("unchecked") + private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) { Review comment: Done ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; + boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); + maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; + joinFound = true; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; + + // Emit expired records before the joined record to keep time ordering + emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp); + context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + } + + // Emit all expired records before adding a new non-joined record to the store. Otherwise, + // the put() call will advance the stream time, which causes records out of the retention + // period to be deleted, thus not being emitted later. + if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) { + emitExpiredNonJoinedOuterRecords(); } if (needOuterJoin) { - context().forward(key, joiner.apply(key, value, null)); + // The maxStreamTime contains the max time observed in both sides of the join. + // Having access to the time observed in the other join side fixes the following + // problem: + // + // Say we have a window size of 5 seconds + // 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not processed + // + // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests + // + // the condition below allows us to process the late record without the need + // to hold it in the temporary outer store + if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) { + context().forward(key, joiner.apply(key, value, null)); + } else { + outerJoinWindowStore.ifPresent(store -> store.put( + KeyAndJoinSide.make(isLeftJoin, key), + LeftOrRightValue.make(isLeftJoin, value), + inputRecordTimestamp)); + } + } + } + } + + private void emitExpiredNonJoinedOuterRecords() { + outerJoinWindowStore.ifPresent(store -> + emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed)); + } + + private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) { + outerJoinWindowStore.ifPresent(store -> { + final KeyAndJoinSide<K> keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key); + + // Emit all expired records except the just found non-joined key. We need + // to emit all expired records before calling put(), otherwise the internal + // stream time will advance and may cause records out of the retention period to + // be deleted. + emitExpiredNonJoinedOuterRecords(store, + recordWindowHasClosed + .and(k -> !k.key().equals(keyAndJoinSide)) + .and(k -> k.window().start() != timestamp)); + + if (store.fetch(keyAndJoinSide, timestamp) != null) { + // Delete the record. The previous emit call may not have removed this record + // if the record window has not closed. + store.put(keyAndJoinSide, null, timestamp); + } + }); + } + + @SuppressWarnings("unchecked") + private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) { + try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) { + while (it.hasNext()) { + final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> e = it.next(); Review comment: Done ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.KeyValue; + +import java.util.Objects; + +/** + * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is + * part of the left join (true) or right join (false). This class is only useful when a state + * store needs to be shared between left and right processors, and each processor needs to + * access the key of the other processor. + */ +public class KeyAndJoinSide<K> { + private final K key; + private final boolean leftJoin; + + private KeyAndJoinSide(final boolean leftJoin, final K key) { + this.key = Objects.requireNonNull(key, "key is null"); Review comment: Done ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Objects; + +/** + * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class + * combines a key with a boolean value that specifies if the key is found in the left side of a + * join or on the right side. This {@link LeftOrRightValue} object contains either the V1 value, + * which is found in the left topic, or V2 value if it is found in the right topic. + */ +public class LeftOrRightValue<V1, V2> { + private final V1 leftValue; + private final V2 rightValue; + + private LeftOrRightValue(final V1 leftValue, final V2 rightValue) { + this.leftValue = leftValue; + this.rightValue = rightValue; + } + + /** + * Create a new {@link LeftOrRightValue} instance with the V1 value as {@code leftValue} and + * V2 value as null. + * + * @param leftValue the left V1 value + * @param <V1> the type of the value + * @return a new {@link LeftOrRightValue} instance + */ + public static <V1, V2> LeftOrRightValue<V1, V2> makeLeftValue(final V1 leftValue) { + Objects.requireNonNull(leftValue, "leftValue is null"); Review comment: Done ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Objects; + +/** + * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class + * combines a key with a boolean value that specifies if the key is found in the left side of a + * join or on the right side. This {@link LeftOrRightValue} object contains either the V1 value, + * which is found in the left topic, or V2 value if it is found in the right topic. + */ +public class LeftOrRightValue<V1, V2> { + private final V1 leftValue; + private final V2 rightValue; + + private LeftOrRightValue(final V1 leftValue, final V2 rightValue) { + this.leftValue = leftValue; Review comment: Done ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.KeyValue; + +import java.util.Objects; + +/** + * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is + * part of the left join (true) or right join (false). This class is only useful when a state + * store needs to be shared between left and right processors, and each processor needs to + * access the key of the other processor. + */ +public class KeyAndJoinSide<K> { + private final K key; + private final boolean leftJoin; Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org