guozhangwang commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r708751965



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -112,10 +113,42 @@ private KafkaStreamsNamedTopologyWrapper(final 
Collection<NamedTopology> topolog
         );
     }
 
+    /**
+     * Provides a high-level DSL for specifying the processing logic of your 
application and building it into an
+     * independent topology that can be executed by this {@link KafkaStreams}.
+     *
+     * @param topologyName              The name for this topology
+     * @param topologyConfigs           The properties and any config 
overrides for this topology
+     *
+     * @throws IllegalArgumentException if the name contains the character 
sequence "__"
+     */
+    public NamedTopologyBuilder newNamedTopologyBuilder(final String 
topologyName, final Properties topologyConfigs) {
+        if (topologyName.contains(TaskId.NAMED_TOPOLOGY_DELIMITER)) {
+            throw new IllegalArgumentException("The character sequence '__' is 
not allowed in a NamedTopology, please select a new name");
+        }
+        return new NamedTopologyBuilder(topologyName, applicationConfigs, 
topologyConfigs);
+    }
+
+    /**
+     * Returns an empty topology for full control over the graph of streams 
and processor nodes that define the processing
+     * logic to be executed by this {@link KafkaStreams}.
+     *
+     * @param topologyName              The name for this topology
+     * @param topologyConfigs           The properties and any config 
overrides for this topology
+     *
+     * @throws IllegalArgumentException if the name contains the character 
sequence "__"
+     */
+    public NamedTopology newNamedTopology(final String topologyName, final 
Properties topologyConfigs) {

Review comment:
       This one seems not used? And even in external callers like ksql, I think 
we would go through the newNamedTopologyBuilder().build() right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -359,15 +363,23 @@ public final InternalTopologyBuilder 
setApplicationId(final String applicationId
         return this;
     }
 
-    public synchronized final InternalTopologyBuilder setStreamsConfig(final 
StreamsConfig config) {
-        Objects.requireNonNull(config, "config can't be null");
-        this.config = config;
+    public synchronized final void setTopologyOverrides(final Properties 
props) {

Review comment:
       This function seems not used in non-testing code any more?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
##########
@@ -927,10 +931,54 @@ public void 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics() {
     }
 
     @Test
-    public void shouldSetStreamsConfigOnRewriteTopology() {
+    public void shouldSetTopologyConfigOnRewriteTopology() {
+        final Properties globalProps = StreamsTestUtils.getStreamsConfig();
+        globalProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 100L);
+        final StreamsConfig globalStreamsConfig = new 
StreamsConfig(globalProps);
+        final InternalTopologyBuilder topologyBuilder = 
builder.rewriteTopology(globalStreamsConfig);
+        assertThat(topologyBuilder.topologyConfig(), equalTo(new 
TopologyConfig(null, globalStreamsConfig, new Properties())));
+        
assertThat(topologyBuilder.topologyConfig().getTaskConfig().maxTaskIdleMs, 
equalTo(100L));
+    }
+
+    @Test
+    public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() 
{
+        final Properties topologyOverrides = new Properties();
+        topologyOverrides.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L);
+        topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
+        
topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);
+        
topologyOverrides.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class);
+        
topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 LogAndContinueExceptionHandler.class);
+        builder.setNamedTopology(new NamedTopology("test-topology", new 
Properties()));
+        builder.setTopologyOverrides(topologyOverrides);
+
+        final StreamsConfig config = new 
StreamsConfig(StreamsTestUtils.getStreamsConfig());
+        final InternalTopologyBuilder topologyBuilder = 
builder.rewriteTopology(config);
+
+        
assertThat(topologyBuilder.topologyConfig().getTaskConfig().maxTaskIdleMs, 
equalTo(500L));
+        
assertThat(topologyBuilder.topologyConfig().getTaskConfig().taskTimeoutMs, 
equalTo(1000L));
+        
assertThat(topologyBuilder.topologyConfig().getTaskConfig().maxBufferedSize, 
equalTo(15));
+        
assertThat(topologyBuilder.topologyConfig().getTaskConfig().timestampExtractor.getClass(),
 equalTo(MockTimestampExtractor.class));
+        
assertThat(topologyBuilder.topologyConfig().getTaskConfig().deserializationExceptionHandler.getClass(),
 equalTo(LogAndContinueExceptionHandler.class));
+    }
+
+    @Test
+    public void 
shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() {

Review comment:
       Thanks for the updated tests! They are great.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -112,10 +113,42 @@ private KafkaStreamsNamedTopologyWrapper(final 
Collection<NamedTopology> topolog
         );
     }
 
+    /**
+     * Provides a high-level DSL for specifying the processing logic of your 
application and building it into an
+     * independent topology that can be executed by this {@link KafkaStreams}.
+     *
+     * @param topologyName              The name for this topology
+     * @param topologyConfigs           The properties and any config 
overrides for this topology
+     *
+     * @throws IllegalArgumentException if the name contains the character 
sequence "__"
+     */
+    public NamedTopologyBuilder newNamedTopologyBuilder(final String 
topologyName, final Properties topologyConfigs) {
+        if (topologyName.contains(TaskId.NAMED_TOPOLOGY_DELIMITER)) {
+            throw new IllegalArgumentException("The character sequence '__' is 
not allowed in a NamedTopology, please select a new name");
+        }
+        return new NamedTopologyBuilder(topologyName, applicationConfigs, 
topologyConfigs);
+    }
+
+    /**
+     * Returns an empty topology for full control over the graph of streams 
and processor nodes that define the processing
+     * logic to be executed by this {@link KafkaStreams}.
+     *
+     * @param topologyName              The name for this topology
+     * @param topologyConfigs           The properties and any config 
overrides for this topology
+     *
+     * @throws IllegalArgumentException if the name contains the character 
sequence "__"
+     */
+    public NamedTopology newNamedTopology(final String topologyName, final 
Properties topologyConfigs) {

Review comment:
       nit: in some classes we call it `topologyConfigs` and in some others we 
call it `topologyOverrides` (e.g. the NamedTopology constructor which takes 
`topologyConfigs` but name it as `topologyOverrides`). How about just make all 
of these parameters `topologyConfigs`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to