mjsax commented on code in PR #22156:
URL: https://github.com/apache/kafka/pull/22156#discussion_r3365829255
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -145,7 +149,9 @@ public void process(final Record<K, VThis> record) {
// Emit all non-joined records which window has closed
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
- outerJoinStore.ifPresent(store ->
emitNonJoinedOuterRecords(store, record));
+ if (hasOuterJoinStore()) {
Review Comment:
If we use `Optional` we don't need this helper method.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -248,38 +253,40 @@ private void emitNonJoinedOuterRecords(final
KeyValueStore<TimestampedKeyAndJoin
continue;
}
- final LeftOrRightValue<VLeft, VRight> leftOrRightValue =
nextKeyValue.value;
- forwardNonJoinedOuterRecords(record,
timestampedKeyAndJoinSide, leftOrRightValue);
+ forwardNonJoinedOuterRecords(record,
timestampedKeyAndJoinSide, nextKeyValue.value);
Review Comment:
Why this change? I think it make the code less readable. It's not easily
clear what `nextKeyValue.value` is, and using `final LeftOrRightValue<VLeft,
VRight> leftOrRightValue = nextKeyValue.value;` make it easier to read IMHO.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/OuterJoinStoreWrapper.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.DslStoreFormat;
+import org.apache.kafka.streams.KeyValue;
+import
org.apache.kafka.streams.kstream.internals.AbstractConfigurableStoreFactory;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * Wraps the outer-join store used by {@code KStreamKStreamJoin} so the
processor only deals
+ * with a single value shape: {@code
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>}.
+ * <p>
+ * The underlying store is one of:
+ * <ul>
+ * <li>plain: {@code KeyValueStore<TimestampedKeyAndJoinSide<K>,
LeftOrRightValue<VLeft, VRight>>}</li>
+ * <li>headers-aware: {@code KeyValueStore<TimestampedKeyAndJoinSide<K>,
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>>}</li>
Review Comment:
I am wondering if we should do the wrapping differently, ie,
`LeftOrRightValue<ValueTimestampHeaders<VLeft>, ValueTimestampHeaders<VRight>>`
?
Well, we would also replace `ValueTimestampHeaders` with
`AggregationWithHeaders` I guess?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/OuterJoinStoreWrapper.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.DslStoreFormat;
+import org.apache.kafka.streams.KeyValue;
+import
org.apache.kafka.streams.kstream.internals.AbstractConfigurableStoreFactory;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * Wraps the outer-join store used by {@code KStreamKStreamJoin} so the
processor only deals
+ * with a single value shape: {@code
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>}.
Review Comment:
I don't think we need to store the timestamp -- the timestamp goes into the
key for this store, because we want to sort all entries by timestamp.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -117,7 +118,10 @@ public void init(final ProcessorContext<K, VOut> context) {
sharedTimeTracker =
sharedTimeTrackerSupplier.get(context.taskId());
if (enableSpuriousResultFix) {
- outerJoinStore = outerJoinWindowStoreFactory.map(s ->
context.getStateStore(s.storeName()));
+ outerJoinStoreWrapper = outerJoinWindowStoreFactory
+ .map(s -> new OuterJoinStoreWrapper<K, VLeft,
VRight>(context, s))
+ .orElse(null);
Review Comment:
Instead if setting `outerJoinStoreWrapper == null` here, should be just make
it an `Optional` by itself, similar to previous `outerJoinStore`?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java:
##########
@@ -33,9 +32,9 @@ public void put(final Bytes key, final byte[] value) {
// we need to log the full new list and thus call get() on the inner
store below
// if the value is a tombstone, we delete the whole list and thus can
save the get call
if (value == null) {
- log(key, null, internalContext.recordContext().timestamp(), new
RecordHeaders());
+ log(key, null, internalContext.recordContext().timestamp(),
internalContext.recordContext().headers());
} else {
- log(key, wrapped().get(key),
internalContext.recordContext().timestamp(), new RecordHeaders());
+ log(key, wrapped().get(key),
internalContext.recordContext().timestamp(),
internalContext.recordContext().headers());
Review Comment:
Not sure if this is correct? Don't we need a
`ChangeLoddingListValueByteStoreWithHeader` class and get the headers from the
`valueAndHeaders` bytes array?
Similar to what we do with
`ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders` ?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/OuterJoinStoreWrapper.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.DslStoreFormat;
+import org.apache.kafka.streams.KeyValue;
+import
org.apache.kafka.streams.kstream.internals.AbstractConfigurableStoreFactory;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * Wraps the outer-join store used by {@code KStreamKStreamJoin} so the
processor only deals
+ * with a single value shape: {@code
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>}.
+ * <p>
+ * The underlying store is one of:
+ * <ul>
+ * <li>plain: {@code KeyValueStore<TimestampedKeyAndJoinSide<K>,
LeftOrRightValue<VLeft, VRight>>}</li>
+ * <li>headers-aware: {@code KeyValueStore<TimestampedKeyAndJoinSide<K>,
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>>}</li>
+ * </ul>
+ * Both variants are wrapped by the same {@link MeteredKeyValueStore} class —
they only differ
Review Comment:
Can't we use `MeteredKeyValueStoreWithHeaders` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]