mjsax commented on code in PR #22156:
URL: https://github.com/apache/kafka/pull/22156#discussion_r3365829255


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -145,7 +149,9 @@ public void process(final Record<K, VThis> record) {
 
             // Emit all non-joined records which window has closed
             if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
-                outerJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, record));
+                if (hasOuterJoinStore()) {

Review Comment:
   If we use `Optional` we don't need this helper method.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -248,38 +253,40 @@ private void emitNonJoinedOuterRecords(final 
KeyValueStore<TimestampedKeyAndJoin
                         continue;
                     }
 
-                    final LeftOrRightValue<VLeft, VRight> leftOrRightValue = 
nextKeyValue.value;
-                    forwardNonJoinedOuterRecords(record, 
timestampedKeyAndJoinSide, leftOrRightValue);
+                    forwardNonJoinedOuterRecords(record, 
timestampedKeyAndJoinSide, nextKeyValue.value);

Review Comment:
   Why this change? I think it make the code less readable. It's not easily 
clear what `nextKeyValue.value` is, and using `final LeftOrRightValue<VLeft, 
VRight> leftOrRightValue = nextKeyValue.value;` make it easier to read IMHO.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/OuterJoinStoreWrapper.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.DslStoreFormat;
+import org.apache.kafka.streams.KeyValue;
+import 
org.apache.kafka.streams.kstream.internals.AbstractConfigurableStoreFactory;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * Wraps the outer-join store used by {@code KStreamKStreamJoin} so the 
processor only deals
+ * with a single value shape: {@code 
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>}.
+ * <p>
+ * The underlying store is one of:
+ * <ul>
+ *   <li>plain: {@code KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VLeft, VRight>>}</li>
+ *   <li>headers-aware: {@code KeyValueStore<TimestampedKeyAndJoinSide<K>, 
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>>}</li>

Review Comment:
   I am wondering if we should do the wrapping differently, ie, 
`LeftOrRightValue<ValueTimestampHeaders<VLeft>, ValueTimestampHeaders<VRight>>` 
?
   
   Well, we would also replace `ValueTimestampHeaders` with 
`AggregationWithHeaders` I guess?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/OuterJoinStoreWrapper.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.DslStoreFormat;
+import org.apache.kafka.streams.KeyValue;
+import 
org.apache.kafka.streams.kstream.internals.AbstractConfigurableStoreFactory;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * Wraps the outer-join store used by {@code KStreamKStreamJoin} so the 
processor only deals
+ * with a single value shape: {@code 
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>}.

Review Comment:
   I don't think we need to store the timestamp -- the timestamp goes into the 
key for this store, because we want to sort all entries by timestamp.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -117,7 +118,10 @@ public void init(final ProcessorContext<K, VOut> context) {
             sharedTimeTracker = 
sharedTimeTrackerSupplier.get(context.taskId());
 
             if (enableSpuriousResultFix) {
-                outerJoinStore = outerJoinWindowStoreFactory.map(s -> 
context.getStateStore(s.storeName()));
+                outerJoinStoreWrapper = outerJoinWindowStoreFactory
+                    .map(s -> new OuterJoinStoreWrapper<K, VLeft, 
VRight>(context, s))
+                    .orElse(null);

Review Comment:
   Instead if setting `outerJoinStoreWrapper == null` here, should be just make 
it an `Optional` by itself, similar to previous `outerJoinStore`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java:
##########
@@ -33,9 +32,9 @@ public void put(final Bytes key, final byte[] value) {
         // we need to log the full new list and thus call get() on the inner 
store below
         // if the value is a tombstone, we delete the whole list and thus can 
save the get call
         if (value == null) {
-            log(key, null, internalContext.recordContext().timestamp(), new 
RecordHeaders());
+            log(key, null, internalContext.recordContext().timestamp(), 
internalContext.recordContext().headers());
         } else {
-            log(key, wrapped().get(key), 
internalContext.recordContext().timestamp(), new RecordHeaders());
+            log(key, wrapped().get(key), 
internalContext.recordContext().timestamp(), 
internalContext.recordContext().headers());

Review Comment:
   Not sure if this is correct? Don't we need a 
`ChangeLoddingListValueByteStoreWithHeader` class and get the headers from the 
`valueAndHeaders` bytes array?
   
   Similar to what we do with 
`ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/OuterJoinStoreWrapper.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.DslStoreFormat;
+import org.apache.kafka.streams.KeyValue;
+import 
org.apache.kafka.streams.kstream.internals.AbstractConfigurableStoreFactory;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * Wraps the outer-join store used by {@code KStreamKStreamJoin} so the 
processor only deals
+ * with a single value shape: {@code 
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>}.
+ * <p>
+ * The underlying store is one of:
+ * <ul>
+ *   <li>plain: {@code KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VLeft, VRight>>}</li>
+ *   <li>headers-aware: {@code KeyValueStore<TimestampedKeyAndJoinSide<K>, 
ValueTimestampHeaders<LeftOrRightValue<VLeft, VRight>>>}</li>
+ * </ul>
+ * Both variants are wrapped by the same {@link MeteredKeyValueStore} class — 
they only differ

Review Comment:
   Can't we use `MeteredKeyValueStoreWithHeaders`  ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to