mjsax commented on a change in pull request #11705:
URL: https://github.com/apache/kafka/pull/11705#discussion_r827380865



##########
File path: streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
##########
@@ -153,6 +166,17 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
         } else {
             deserializationExceptionHandlerSupplier = () -> 
globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 DeserializationExceptionHandler.class);
         }
+
+        // Override the default store type config no matter it's a named 
topology or not since it should apply to all topologies
+        storeType = getString(DEFAULT_DSL_STORE_CONFIG);
+        log.info("Topology {} is overriding {} to {}", topologyName, 
DEFAULT_DSL_STORE_CONFIG, storeType);

Review comment:
       Why do we log this unconditionally? Should we not have a overwrite for 
`isTopologyOverride`?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
##########
@@ -73,4 +79,24 @@ public void shouldUseStoreNameOfSupplierWhenProvided() {
             new MaterializedInternal<>(Materialized.as(supplier), 
nameProvider, prefix);
         assertThat(materialized.storeName(), equalTo(storeName));
     }
+
+    @Test
+    public void shouldUseStoreTypeWhenProvidedViaTopologyConfig() {
+        final Properties topologyOverrides = new Properties();
+        topologyOverrides.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, 
StreamsConfig.IN_MEMORY);
+        final StreamsConfig config = new 
StreamsConfig(StreamsTestUtils.getStreamsConfig());
+
+        final InternalTopologyBuilder topologyBuilder = new 
InternalTopologyBuilder(
+            new TopologyConfig(
+            "my-topology",

Review comment:
       nit: indention

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
##########
@@ -259,4 +285,16 @@ protected Materialized(final Materialized<K, V, S> 
materialized) {
         this.retention = retention;
         return this;
     }
+
+    /**
+     * Set the type of the materialized {@link StateStore}.
+     *
+     * @param storeType  the store type {@link StoreType} to use.
+     * @return itself
+     */
+    public Materialized<K, V, S> withStoreType(final StoreType storeType) 
throws IllegalArgumentException {
+        Objects.requireNonNull(storeType, "store type can't be null");

Review comment:
       Should we set `storeSupplier = null` here? Or throw an exception (or log 
a WARN)? It seems not to make sense to have both `storeType` and 
`storeSupplier` being set?

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -1164,15 +1172,19 @@ public void 
kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
                 "      <-- KSTREAM-SOURCE-0000000000\n\n",
             describe.toString()
         );
+
+        
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
 is(true));

Review comment:
       Not sure if we should piggy-back this onto an existing test?
   
   If we want to by systematic, it might be better to parametrize the test and 
let all tests re-run with "default" as well as setting RocksDB  and in-memory 
expliclity (either via config or via parameter call)?
   
   It seems random to set RocksDB for some tests and in-memory for others?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -81,14 +81,19 @@ public StreamsBuilder() {
         internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
     }
 
-    protected StreamsBuilder(final TopologyConfig topologyConfigs) {
+    /**
+     * Create a {@code StreamsBuilder} instance.
+     *
+     * @param topologyConfigs    the streams configs that apply at the 
topology level. Please refer to {@link TopologyConfig} for more detail
+     */
+    public StreamsBuilder(final TopologyConfig topologyConfigs) {
         topology = getNewTopology(topologyConfigs);
         internalTopologyBuilder = topology.internalTopologyBuilder;
         internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
     }
 
     protected Topology getNewTopology(final TopologyConfig topologyConfigs) {

Review comment:
       What is the purpose of this method? Seems the constructor could just be 
called directly? Should we remove it?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
##########
@@ -121,11 +121,22 @@
                     + " grace=[" + sessionWindows.gracePeriodMs() + "],"
                     + " retention=[" + retentionPeriod + "]");
             }
-            supplier = Stores.persistentSessionStore(
-                materialized.storeName(),
-                Duration.ofMillis(retentionPeriod)
-            );
+
+            switch (materialized.storeType()) {
+                case IN_MEMORY:
+                    supplier = Stores.inMemorySessionStore(
+                        materialized.storeName(),
+                        Duration.ofMillis(retentionPeriod)
+                    );
+                    break;
+                default:

Review comment:
       Can we make this a `case ROCK_DB` and add a
   ```
   default:
     throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
   ```
   
   Similar elsewhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to