guozhangwang commented on a change in pull request #11242: URL: https://github.com/apache/kafka/pull/11242#discussion_r742533219
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java ########## @@ -224,12 +225,25 @@ + " retention=[" + retentionPeriod + "]"); } - supplier = Stores.persistentTimestampedWindowStore( + // get from default store implementation + final StoreImplementation storeImplementation = materialized.storeImplementation(); Review comment: This is a meta comment: I feel the introduction of the `StoreImplementation` is a bit overkill here. Could the same be achieved with just an `enum` type say "StoreImplTypes" kept inside the Materialize object, and inside `Stores` the added creator would be: ``` Stores#keyValueBytesStoreSupplier(storeImplTypes, storeName, ...) Stores#windowBytesStoreSupplier(storeImplTypes, storeName, ...) Stores#sessionBytesStoreSupplier(storeImplTypes, storeName, ...) ``` Then inside the impl classes like here we would could directly do: ``` if ((supplier = materialized.storeSupplier) == null) { supplier = Stores.windowBytesStoreSupplier(materialized.storeImplType()) } ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java ########## @@ -103,6 +110,21 @@ protected Materialized(final Materialized<K, V, S> materialized) { return new Materialized<>(storeName); } + /** + * Materialize a {@link StateStore} with the store implementation. + * + * @param storeImplementation store implementation used to materialize the store + * alphanumerics, '.', '_' and '-'. + * @param <K> key type of the store + * @param <V> value type of the store + * @param <S> type of the {@link StateStore} + * @return a new {@link Materialized} instance with the given storeName + */ + public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreImplementation storeImplementation) { Review comment: In practice I thought no users code would call this function since they are all set via the config? I have not seen any non-testing code that calls this. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java ########## @@ -224,12 +225,25 @@ + " retention=[" + retentionPeriod + "]"); } - supplier = Stores.persistentTimestampedWindowStore( + // get from default store implementation + final StoreImplementation storeImplementation = materialized.storeImplementation(); + if (storeImplementation != null) { + supplier = storeImplementation.windowBytesStoreSupplier( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false); + } else { Review comment: Also since we always default to the `persistent` type at the MaterializedInternal constructor if the config is not set, we should never need to hit this fallback logic right? If I read it wrong, could we make sure that at Materialized constructor if the config value is not set (should not be the case since we define its default value indeed?), we then hard-code a persistent type. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org