ableegoldman commented on code in PR #14708: URL: https://github.com/apache/kafka/pull/14708#discussion_r1385859246
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowStoreMaterializer.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import java.time.Duration; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SessionWindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V, SessionStore<Bytes, byte[]>> { Review Comment: nit: just call it `SessionStoreMaterializer` Technically they are "session windows", yes, but we have SessionStore vs WindowStore rather than SessionWindowStore vs TimeWindowStore, so both for clarify and brevity, let's remove the "Window" part of the name (I get that for the Materializer we actually do have different implementations for different types of time windows, but I think it's best to keep the session window stuff distinct from the time window variations -- otherwise it seems like this is just another subset of a WindowStore. I hope that explanation makes sense?) ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java: ########## @@ -108,56 +106,4 @@ public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer, sessionMerger); } - private StoreFactory materialize(final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materialized) { Review Comment: I LOVE that we are able to extract all this repeated logic into a single place ❤️ ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowStoreMaterializer.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import java.time.Duration; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SessionWindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V, SessionStore<Bytes, byte[]>> { + + private static final Logger LOG = LoggerFactory.getLogger(KeyValueStoreMaterializer.class); Review Comment: copy/paste error (I always check b/c I make this exact mistake myself all the time haha) ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java: ########## @@ -108,56 +106,4 @@ public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer, sessionMerger); } - private StoreFactory materialize(final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materialized) { Review Comment: I LOVE that we are able to extract all this repeated logic into a single place ❤️ ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowStoreMaterializer.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import java.time.Duration; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier; + +public class TimeWindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V, WindowStore<Bytes, byte[]>> { + + private final Windows<?> windows; + private final EmitStrategy emitStrategy; + + public TimeWindowStoreMaterializer( + final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized, + final Windows<?> windows, + final EmitStrategy emitStrategy + ) { + super(materialized); + this.windows = windows; + this.emitStrategy = emitStrategy; + } + + @Override + public StateStore build() { + WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier(); + if (supplier == null) { + final long retentionPeriod = retentionPeriod(); + + if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) { + throw new IllegalArgumentException("The retention period of the window store " + + materialized.storeName() + " must be no smaller than its window size plus the grace period." + + " Got size=[" + windows.size() + "]," + + " grace=[" + windows.gracePeriodMs() + "]," + + " retention=[" + retentionPeriod + "]"); + } + + switch (defaultStoreType) { + case IN_MEMORY: + supplier = Stores.inMemoryWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false + ); + break; + case ROCKS_DB: + supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ? + RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false, + false + ) : + Stores.persistentTimestampedWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false + ); + break; + default: + throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + } + } + + final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores.timestampedWindowStoreBuilder( + supplier, + materialized.keySerde(), + materialized.valueSerde() + ); + + if (materialized.loggingEnabled()) { + builder.withLoggingEnabled(materialized.logConfig()); + } else { + builder.withLoggingDisabled(); + } + + // TODO(agavra): remove before merging, should we do what we do with other stores + // and disable caching in the case StrategyType.ON_WINDOW_CLOSE is used? Review Comment: Would be good to have @guozhangwang confirm, or maybe @mjsax or @lihaosky can chime in here, but my understanding is that we have to disable caching when using `ON_WINDOW_CLOSE` because the alternative store implementation has a different ordering scheme than the CachingWindow/SessionStore, breaking range queries as we can't properly merge results between the cache and underlying store. Of course in the DSL, sliding and session window aggregations use range queries whereas time window aggregations do not. And since this seems to be a DSL-specific feature, I guess we just don't care about the incompatible cache ordering for time windows since we'll only be doing point lookups. I also noticed that we added a new caching layer specifically to work with the RocksDbIndexedTimeOrderedWindowBytesStore, called [TimeOrderedCachingWindowStore](https://github.com/apache/kafka/pull/12030/files#diff-65aaedea554a4017e0e1a673883e3b73f609f117f71e86541c9d45092bf0dbd2R59) Not sure how that comes into play here, but I think it makes it clear that we do intend to allow caching with time windows (only) Code-wise, the difference between sliding windows and time windows seems to be that the former is "indexed", as you can see by following the invocations of `RocksDbIndexedTimeOrderedWindowBytesStoreSupplier#create` Presumably this means the TimeOrderedCachingWindowStore doesn't work for sliding window stores, only time window stores. I suppose it would be nice to include a condensed version of this in the comments about this in the session and sliding window materializers. Right now they only state that caching _is_ disabled (which we can plainly see from the code), but not _why_ it is disabled, which is the important part that can't be understood from the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
