lucasbru commented on code in PR #18233: URL: https://github.com/apache/kafka/pull/18233#discussion_r1952422436
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java: ########## @@ -168,6 +169,15 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs, ); } + if (userProvidedBaseStoreName == null) { + addInternalResourceName(thisWindowStore); + addInternalResourceName(otherWindowStore); + if (outerJoinWindowStore.isPresent()) { + addInternalResourceName(outerJoinWindowStore.get()); + } + + } Review Comment: ```suggestion } } ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -1051,76 +1063,15 @@ public <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table, final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin( name != null ? name : this.name, joinedInternal.keySerde(), - joinedInternal.leftValueSerde() + joinedInternal.leftValueSerde(), + name != null ); return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, true); } else { return doStreamTableJoin(table, joiner, joinedInternal, true); } } - @SuppressWarnings({"unchecked", "resource"}) Review Comment: Why did you move this method? It seems Matthias moved it on purpose and you are reverting his change here? ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -592,6 +592,11 @@ public class StreamsConfig extends AbstractConfig { public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." + " The cluster must have a client metrics subscription which corresponds to a client."; + /** {@code ensure.explicit.internal.resource.naming} */ + public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG = "ensure.explicit.internal.resource.naming"; + public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = "Whether to enforce explicit naming for all internal resources of the topology, including internal " + + " topics (e.g., changelog and repartition topics) and their associated state stores." + + " When enabled, the application will refuse to start if any internal resource has an auto-generated name."; /** {@code log.summary.interval.ms} */ Review Comment: nit: missing newline above javadoc ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -592,6 +592,11 @@ public class StreamsConfig extends AbstractConfig { public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." + " The cluster must have a client metrics subscription which corresponds to a client."; + /** {@code ensure.explicit.internal.resource.naming} */ + public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG = "ensure.explicit.internal.resource.naming"; + public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = "Whether to enforce explicit naming for all internal resources of the topology, including internal " + Review Comment: ```suggestion public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = "Whether to enforce explicit naming for all internal resources of the topology, including internal" + ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -2289,4 +2298,45 @@ public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wra processorWrapper.wrapProcessorSupplier(name, processorSupplier) ); } + + public void addImplicitInternalNames(final InternalResourcesNaming internalResourcesNaming) { + implicitInternalNames.add(internalResourcesNaming); + } + + public void checkUnprovidedNames() { + if (!implicitInternalNames.isEmpty()) { + final StringBuilder result = new StringBuilder(); + final List<String> changelogTopics = new ArrayList<>(); + final List<String> stateStores = new ArrayList<>(); + final List<String> repartitionTopics = new ArrayList<>(); + for (final InternalResourcesNaming internalResourcesNaming : implicitInternalNames) { + if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) { + changelogTopics.add(internalResourcesNaming.changelogTopic()); + } + if (!Utils.isBlank(internalResourcesNaming.stateStore())) { + stateStores.add(internalResourcesNaming.stateStore()); + } + if (!Utils.isBlank(internalResourcesNaming.repartitionTopic())) { + repartitionTopics.add(internalResourcesNaming.repartitionTopic()); + } + } + if (!changelogTopics.isEmpty()) { + result.append(String.format("Following changelog topic(s) has not been named: %s%n", String.join(", ", changelogTopics))); + } + if (!stateStores.isEmpty()) { + result.append(String.format("Following state store(s) has not been named: %s%n", String.join(", ", stateStores))); + } + if (!repartitionTopics.isEmpty()) { + result.append(String.format("Following repartition topic(s) has not been named: %s%n", String.join(", ", repartitionTopics))); + } + if (ensureExplicitInternalResourceNaming) { + throw new TopologyException(result.toString()); + } else { + log.warn("Enforce explicit naming for all internal resources is set to false. If you want" + Review Comment: I'm not sure I'm mega happy with this log message, as every user trying a basic example of Kafka Streams will be confronted with it. But I see it was part of the KIP and this proposal was accepted. I added a little note to the VOTE thread, to point out that this log was part of the KIP - I had missed it. But I'm okay with going forward with the KIP as accepted if nobody shares my concerns. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -64,12 +66,15 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG; import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; + Review Comment: ```suggestion ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ########## @@ -384,7 +384,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>( - Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), + Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()).withLoggingDisabled(), Review Comment: Ah, yes. Makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org