spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r611878043
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ########## @@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, return builder; } + @SuppressWarnings("unchecked") + private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName, + final JoinWindows windows, + final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) { + final StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>( + persistentTimeOrderedWindowStore( + storeName + "-store", + Duration.ofMillis(windows.size() + windows.gracePeriodMs()), + Duration.ofMillis(windows.size()) + ), + new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()), + new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()), + Time.SYSTEM + ); + if (streamJoinedInternal.loggingEnabled()) { + builder.withLoggingEnabled(streamJoinedInternal.logConfig()); + } else { + builder.withLoggingDisabled(); + } + + return builder; + } + + // This method has same code as Store.persistentWindowStore(). But TimeOrderedWindowStore is + // a non-public API, so we need to keep duplicate code until it becomes public. + private static WindowBytesStoreSupplier persistentTimeOrderedWindowStore(final String storeName, + final Duration retentionPeriod, + final Duration windowSize) { + Objects.requireNonNull(storeName, "name cannot be null"); + final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); + final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); + final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); + final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix); + + final long segmentInterval = Math.max(retentionMs / 2, 60_000L); + + if (retentionMs < 0L) { + throw new IllegalArgumentException("retentionPeriod cannot be negative"); + } + if (windowSizeMs < 0L) { + throw new IllegalArgumentException("windowSize cannot be negative"); + } + if (segmentInterval < 1L) { + throw new IllegalArgumentException("segmentInterval cannot be zero or negative"); + } + if (windowSizeMs > retentionMs) { + throw new IllegalArgumentException("The retention period of the window store " + + storeName + " must be no smaller than its window size. Got size=[" + + windowSizeMs + "], retention=[" + retentionMs + "]"); + } + + return new RocksDbWindowBytesStoreSupplier( + storeName, + retentionMs, + segmentInterval, + windowSizeMs, + false, Review comment: I had issues with duplicates, and forgot to investigate about it. I just did another round of investigation, but I still get issues with it. The problem is I cannot delete any key when duplicates are used. This happens in any window store, not just the time-ordered window store. The problem I found is: 1. Added two duplicates with key = 0 and time = 0 ``` # this adds a key with seqNum = 0 put(0, "A0", 0) # this adds a key with seqNum = 1 put(0, "A0-0", 0) ``` 2. Delete key = 0 and time = 0 ``` # this attempts to delete with seqNum = 2, which it does not exist put(0, null, 0) ``` Initially I didn't think using duplicates were necessary, but I just wrote a test case with the old semantics and duplicates are processed, so I need to support it. Do you know if deleting duplicates was unsupported all the time? or am I missing some API or workaround? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org