spena commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r701101588



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -48,15 +50,22 @@ public void configure(final Map<String, ?> configs, final 
boolean isKey) {
 
     @Override
     public KeyAndJoinSide<K> deserialize(final String topic, final byte[] 
data) {
-        final boolean bool = data[0] == 1;
+        final boolean bool = data[8] == 1;

Review comment:
       It should be good to add a constant for the `8` number, so it is easily 
read. for instance, the `rawKey()` has `new byte[data.length - 9]`, which I 
assume is `len - TIMESTAMP - BOOL`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideSerializer.java
##########
@@ -55,9 +57,11 @@ public void configure(final Map<String, ?> configs, final 
boolean isKey) {
     public byte[] serialize(final String topic, final KeyAndJoinSide<K> data) {
         final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0);
         final byte[] keyBytes = keySerializer.serialize(topic, data.getKey());
+        final byte[] timestampBytes = timestampSerializer.serialize(topic, 
data.getTimestamp());
 
         return ByteBuffer
-            .allocate(keyBytes.length + 1)
+            .allocate(8 + keyBytes.length + 1)

Review comment:
       Should the `8` be a constant variable or just `timestampBytes.length`? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -63,6 +63,10 @@
     private InternalProcessorContext context;
     private TaskId taskId;
 
+    interface WindowedKeySerde {
+        Bytes serialize(final Bytes key, final long timestamp, final int 
seqnum);
+    }
+

Review comment:
       Is this used somewhere?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##########
@@ -33,172 +39,112 @@
  * For key range queries, like fetch(key, fromTime, toTime), use the {@link 
RocksDBWindowStore}
  * which uses the {@link WindowKeySchema} to serialize the record bytes for 
efficient key queries.
  */
+@SuppressWarnings("unchecked")
 public class RocksDBTimeOrderedWindowStore

Review comment:
       Two things:
   - Seems `TimeOrderedKeySchema` is not needed anymore. Should the class be 
removed?
   - Should we rename the class to remove the `Window` par?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -146,7 +146,7 @@ public void process(final K key, final V1 value) {
 
                     outerJoinWindowStore.ifPresent(store -> {
                         // Delete the joined record from the non-joined outer 
window store
-                        store.put(KeyAndJoinSide.make(!isLeftSide, key), null, 
otherRecordTimestamp);
+                        store.put(KeyAndJoinSide.make(!isLeftSide, key, 
otherRecordTimestamp), null);

Review comment:
       Should you call `store.delete(KeyAndJoinSide.make(!isLeftSide, key, 
otherRecordTimestamp))` now?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -209,37 +208,36 @@ private void emitNonJoinedOuterRecords(final 
WindowStore<KeyAndJoinSide<K>, Left
             // reset to MAX_VALUE in case the store is empty
             sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, 
LeftOrRightValue> it = store.all()) {
+            try (final KeyValueIterator<KeyAndJoinSide<K>, LeftOrRightValue> 
it = store.all()) {
                 while (it.hasNext()) {
-                    final KeyValue<Windowed<KeyAndJoinSide<K>>, 
LeftOrRightValue> record = it.next();
+                    final KeyValue<KeyAndJoinSide<K>, LeftOrRightValue> record 
= it.next();
 
-                    final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
-                    final LeftOrRightValue value = record.value;
-                    sharedTimeTracker.minTime = windowedKey.window().start();
+                    final KeyAndJoinSide<K> keyAndJoinSide = record.key;
+                    final LeftOrRightValue<V1, V2> value = record.value;
+                    final K key = keyAndJoinSide.getKey();
+                    final long timestamp = keyAndJoinSide.getTimestamp();
+                    sharedTimeTracker.minTime = timestamp;
 
                     // Skip next records if window has not closed
-                    if (windowedKey.window().start() + joinAfterMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+                    if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
                         break;
                     }
 
-                    final K key = windowedKey.key().getKey();
-                    final long time = windowedKey.window().start();
-
                     final R nullJoinedValue;
                     if (isLeftSide) {
                         nullJoinedValue = joiner.apply(key,
-                            (V1) value.getLeftValue(),
-                            (V2) value.getRightValue());
+                                value.getLeftValue(),
+                                value.getRightValue());
                     } else {
                         nullJoinedValue = joiner.apply(key,
-                            (V1) value.getRightValue(),
-                            (V2) value.getLeftValue());
+                                (V1) value.getRightValue(),
+                                (V2) value.getLeftValue());
                     }
 
-                    context().forward(key, nullJoinedValue, 
To.all().withTimestamp(time));
+                    context().forward(key, nullJoinedValue, 
To.all().withTimestamp(timestamp));
 
                     // Delete the key from the outer window store now it is 
emitted
-                    store.put(record.key.key(), null, 
record.key.window().start());
+                    store.put(keyAndJoinSide, null);

Review comment:
       Should you call `store.delete(keyAndJoinSide)` now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to