spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611878043



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                               
                                                    final JoinWindows windows,
+                                                                               
                                                    final 
StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
ValueOrOtherValue<V1, V2>>> builder = new 
TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM
+        );
+        if (streamJoinedInternal.loggingEnabled()) {
+            builder.withLoggingEnabled(streamJoinedInternal.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        return builder;
+    }
+
+    // This method has same code as Store.persistentWindowStore(). But 
TimeOrderedWindowStore is
+    // a non-public API, so we need to keep duplicate code until it becomes 
public.
+    private static WindowBytesStoreSupplier 
persistentTimeOrderedWindowStore(final String storeName,
+                                                                             
final Duration retentionPeriod,
+                                                                             
final Duration windowSize) {
+        Objects.requireNonNull(storeName, "name cannot be null");
+        final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+        final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+        final long segmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+        if (retentionMs < 0L) {
+            throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+        }
+        if (windowSizeMs < 0L) {
+            throw new IllegalArgumentException("windowSize cannot be 
negative");
+        }
+        if (segmentInterval < 1L) {
+            throw new IllegalArgumentException("segmentInterval cannot be zero 
or negative");
+        }
+        if (windowSizeMs > retentionMs) {
+            throw new IllegalArgumentException("The retention period of the 
window store "
+                + storeName + " must be no smaller than its window size. Got 
size=["
+                + windowSizeMs + "], retention=[" + retentionMs + "]");
+        }
+
+        return new RocksDbWindowBytesStoreSupplier(
+            storeName,
+            retentionMs,
+            segmentInterval,
+            windowSizeMs,
+            false,

Review comment:
       I had issues with duplicates, and forgot to investigate about it. I just 
did another round of investigation, but I still get issues with it. The problem 
is I cannot delete any key when duplicates are used. This happens in any window 
store, not just the time-ordered window store.
   
   The problem I found is:
   
   1. Added two duplicates with key = 0 and time = 0
   ```
   # this adds a key with seqNum = 0
   put(0, "A0", 0) 
   # this adds a key with seqNum = 1
   put(0, "A0-0", 0)
   ```
   2. Delete key = 0 and time = 0
   ```
   # this attempts to delete with seqNum = 2, which it does not exist
   put(0, null, 0)
   ```
   
   Initially I didn't think using duplicates were necessary, but I just wrote a 
test case with the old semantics and duplicates are processed, so I need to 
support it. Do you know if deleting duplicates was unsupported all the time? or 
am I missing some API or workaround?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to