mjsax commented on a change in pull request #11705:
URL: https://github.com/apache/kafka/pull/11705#discussion_r827380865
##########
File path: streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
##########
@@ -153,6 +166,17 @@ public TopologyConfig(final String topologyName, final
StreamsConfig globalAppCo
} else {
deserializationExceptionHandlerSupplier = () ->
globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
}
+
+ // Override the default store type config no matter it's a named
topology or not since it should apply to all topologies
+ storeType = getString(DEFAULT_DSL_STORE_CONFIG);
+ log.info("Topology {} is overriding {} to {}", topologyName,
DEFAULT_DSL_STORE_CONFIG, storeType);
Review comment:
Why do we log this unconditionally? Should we not have a overwrite for
`isTopologyOverride`?
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
##########
@@ -73,4 +79,24 @@ public void shouldUseStoreNameOfSupplierWhenProvided() {
new MaterializedInternal<>(Materialized.as(supplier),
nameProvider, prefix);
assertThat(materialized.storeName(), equalTo(storeName));
}
+
+ @Test
+ public void shouldUseStoreTypeWhenProvidedViaTopologyConfig() {
+ final Properties topologyOverrides = new Properties();
+ topologyOverrides.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG,
StreamsConfig.IN_MEMORY);
+ final StreamsConfig config = new
StreamsConfig(StreamsTestUtils.getStreamsConfig());
+
+ final InternalTopologyBuilder topologyBuilder = new
InternalTopologyBuilder(
+ new TopologyConfig(
+ "my-topology",
Review comment:
nit: indention
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
##########
@@ -259,4 +285,16 @@ protected Materialized(final Materialized<K, V, S>
materialized) {
this.retention = retention;
return this;
}
+
+ /**
+ * Set the type of the materialized {@link StateStore}.
+ *
+ * @param storeType the store type {@link StoreType} to use.
+ * @return itself
+ */
+ public Materialized<K, V, S> withStoreType(final StoreType storeType)
throws IllegalArgumentException {
+ Objects.requireNonNull(storeType, "store type can't be null");
Review comment:
Should we set `storeSupplier = null` here? Or throw an exception (or log
a WARN)? It seems not to make sense to have both `storeType` and
`storeSupplier` being set?
##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -1164,15 +1172,19 @@ public void
kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
" <-- KSTREAM-SOURCE-0000000000\n\n",
describe.toString()
);
+
+
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
is(true));
Review comment:
Not sure if we should piggy-back this onto an existing test?
If we want to by systematic, it might be better to parametrize the test and
let all tests re-run with "default" as well as setting RocksDB and in-memory
expliclity (either via config or via parameter call)?
It seems random to set RocksDB for some tests and in-memory for others?
##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -81,14 +81,19 @@ public StreamsBuilder() {
internalStreamsBuilder = new
InternalStreamsBuilder(internalTopologyBuilder);
}
- protected StreamsBuilder(final TopologyConfig topologyConfigs) {
+ /**
+ * Create a {@code StreamsBuilder} instance.
+ *
+ * @param topologyConfigs the streams configs that apply at the
topology level. Please refer to {@link TopologyConfig} for more detail
+ */
+ public StreamsBuilder(final TopologyConfig topologyConfigs) {
topology = getNewTopology(topologyConfigs);
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new
InternalStreamsBuilder(internalTopologyBuilder);
}
protected Topology getNewTopology(final TopologyConfig topologyConfigs) {
Review comment:
What is the purpose of this method? Seems the constructor could just be
called directly? Should we remove it?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
##########
@@ -121,11 +121,22 @@
+ " grace=[" + sessionWindows.gracePeriodMs() + "],"
+ " retention=[" + retentionPeriod + "]");
}
- supplier = Stores.persistentSessionStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod)
- );
+
+ switch (materialized.storeType()) {
+ case IN_MEMORY:
+ supplier = Stores.inMemorySessionStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod)
+ );
+ break;
+ default:
Review comment:
Can we make this a `case ROCK_DB` and add a
```
default:
throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
```
Similar elsewhere.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]