guozhangwang commented on a change in pull request #11242:
URL: https://github.com/apache/kafka/pull/11242#discussion_r742533219



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
##########
@@ -224,12 +225,25 @@
                         + " retention=[" + retentionPeriod + "]");
             }
 
-            supplier = Stores.persistentTimestampedWindowStore(
+            // get from default store implementation
+            final StoreImplementation storeImplementation = 
materialized.storeImplementation();

Review comment:
       This is a meta comment: I feel the introduction of the 
`StoreImplementation` is a bit overkill here. Could the same be achieved with 
just an `enum` type say "StoreImplTypes" kept inside the Materialize object, 
and inside `Stores` the added creator would be:
   
   ```
   Stores#keyValueBytesStoreSupplier(storeImplTypes, storeName, ...)
   Stores#windowBytesStoreSupplier(storeImplTypes, storeName, ...)
   Stores#sessionBytesStoreSupplier(storeImplTypes, storeName, ...)
   ```
   
   Then inside the impl classes like here we would could directly do:
   
   ```
   if ((supplier = materialized.storeSupplier) == null) {
       supplier = Stores.windowBytesStoreSupplier(materialized.storeImplType())
   }
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
##########
@@ -103,6 +110,21 @@ protected Materialized(final Materialized<K, V, S> 
materialized) {
         return new Materialized<>(storeName);
     }
 
+    /**
+     * Materialize a {@link StateStore} with the store implementation.
+     *
+     * @param storeImplementation  store implementation used to materialize 
the store
+     * alphanumerics, '.', '_' and '-'.
+     * @param <K>       key type of the store
+     * @param <V>       value type of the store
+     * @param <S>       type of the {@link StateStore}
+     * @return a new {@link Materialized} instance with the given storeName
+     */
+    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final 
StoreImplementation storeImplementation) {

Review comment:
       In practice I thought no users code would call this function since they 
are all set via the config? I have not seen any non-testing code that calls 
this.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
##########
@@ -224,12 +225,25 @@
                         + " retention=[" + retentionPeriod + "]");
             }
 
-            supplier = Stores.persistentTimestampedWindowStore(
+            // get from default store implementation
+            final StoreImplementation storeImplementation = 
materialized.storeImplementation();
+            if (storeImplementation != null) {
+                supplier = storeImplementation.windowBytesStoreSupplier(
+                    materialized.storeName(),
+                    Duration.ofMillis(retentionPeriod),
+                    Duration.ofMillis(windows.size()),
+                    false);
+            } else {

Review comment:
       Also since we always default to the `persistent` type at the 
MaterializedInternal constructor if the config is not set, we should never need 
to hit this fallback logic right? If I read it wrong, could we make sure that 
at Materialized constructor if the config value is not set (should not be the 
case since we define its default value indeed?), we then hard-code a persistent 
type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to