guozhangwang commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r708751965
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -112,10 +113,42 @@ private KafkaStreamsNamedTopologyWrapper(final Collection<NamedTopology> topolog ); } + /** + * Provides a high-level DSL for specifying the processing logic of your application and building it into an + * independent topology that can be executed by this {@link KafkaStreams}. + * + * @param topologyName The name for this topology + * @param topologyConfigs The properties and any config overrides for this topology + * + * @throws IllegalArgumentException if the name contains the character sequence "__" + */ + public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName, final Properties topologyConfigs) { + if (topologyName.contains(TaskId.NAMED_TOPOLOGY_DELIMITER)) { + throw new IllegalArgumentException("The character sequence '__' is not allowed in a NamedTopology, please select a new name"); + } + return new NamedTopologyBuilder(topologyName, applicationConfigs, topologyConfigs); + } + + /** + * Returns an empty topology for full control over the graph of streams and processor nodes that define the processing + * logic to be executed by this {@link KafkaStreams}. + * + * @param topologyName The name for this topology + * @param topologyConfigs The properties and any config overrides for this topology + * + * @throws IllegalArgumentException if the name contains the character sequence "__" + */ + public NamedTopology newNamedTopology(final String topologyName, final Properties topologyConfigs) { Review comment: This one seems not used? And even in external callers like ksql, I think we would go through the newNamedTopologyBuilder().build() right? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ########## @@ -359,15 +363,23 @@ public final InternalTopologyBuilder setApplicationId(final String applicationId return this; } - public synchronized final InternalTopologyBuilder setStreamsConfig(final StreamsConfig config) { - Objects.requireNonNull(config, "config can't be null"); - this.config = config; + public synchronized final void setTopologyOverrides(final Properties props) { Review comment: This function seems not used in non-testing code any more? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java ########## @@ -927,10 +931,54 @@ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() { } @Test - public void shouldSetStreamsConfigOnRewriteTopology() { + public void shouldSetTopologyConfigOnRewriteTopology() { + final Properties globalProps = StreamsTestUtils.getStreamsConfig(); + globalProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 100L); + final StreamsConfig globalStreamsConfig = new StreamsConfig(globalProps); + final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(globalStreamsConfig); + assertThat(topologyBuilder.topologyConfig(), equalTo(new TopologyConfig(null, globalStreamsConfig, new Properties()))); + assertThat(topologyBuilder.topologyConfig().getTaskConfig().maxTaskIdleMs, equalTo(100L)); + } + + @Test + public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() { + final Properties topologyOverrides = new Properties(); + topologyOverrides.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L); + topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L); + topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15); + topologyOverrides.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); + topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + builder.setNamedTopology(new NamedTopology("test-topology", new Properties())); + builder.setTopologyOverrides(topologyOverrides); + + final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); + final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(config); + + assertThat(topologyBuilder.topologyConfig().getTaskConfig().maxTaskIdleMs, equalTo(500L)); + assertThat(topologyBuilder.topologyConfig().getTaskConfig().taskTimeoutMs, equalTo(1000L)); + assertThat(topologyBuilder.topologyConfig().getTaskConfig().maxBufferedSize, equalTo(15)); + assertThat(topologyBuilder.topologyConfig().getTaskConfig().timestampExtractor.getClass(), equalTo(MockTimestampExtractor.class)); + assertThat(topologyBuilder.topologyConfig().getTaskConfig().deserializationExceptionHandler.getClass(), equalTo(LogAndContinueExceptionHandler.class)); + } + + @Test + public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() { Review comment: Thanks for the updated tests! They are great. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -112,10 +113,42 @@ private KafkaStreamsNamedTopologyWrapper(final Collection<NamedTopology> topolog ); } + /** + * Provides a high-level DSL for specifying the processing logic of your application and building it into an + * independent topology that can be executed by this {@link KafkaStreams}. + * + * @param topologyName The name for this topology + * @param topologyConfigs The properties and any config overrides for this topology + * + * @throws IllegalArgumentException if the name contains the character sequence "__" + */ + public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName, final Properties topologyConfigs) { + if (topologyName.contains(TaskId.NAMED_TOPOLOGY_DELIMITER)) { + throw new IllegalArgumentException("The character sequence '__' is not allowed in a NamedTopology, please select a new name"); + } + return new NamedTopologyBuilder(topologyName, applicationConfigs, topologyConfigs); + } + + /** + * Returns an empty topology for full control over the graph of streams and processor nodes that define the processing + * logic to be executed by this {@link KafkaStreams}. + * + * @param topologyName The name for this topology + * @param topologyConfigs The properties and any config overrides for this topology + * + * @throws IllegalArgumentException if the name contains the character sequence "__" + */ + public NamedTopology newNamedTopology(final String topologyName, final Properties topologyConfigs) { Review comment: nit: in some classes we call it `topologyConfigs` and in some others we call it `topologyOverrides` (e.g. the NamedTopology constructor which takes `topologyConfigs` but name it as `topologyOverrides`). How about just make all of these parameters `topologyConfigs`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org