guozhangwang commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r700479414



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -611,6 +611,7 @@ public synchronized Topology build() {
      */
     public synchronized Topology build(final Properties props) {
         internalStreamsBuilder.buildAndOptimizeTopology(props);
+        internalTopologyBuilder.setTopologyProperties(props);

Review comment:
       nit: add a TODO here that for now we always set the overrides as the 
same as global application props?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyStreamsBuilder.java
##########
@@ -17,35 +17,24 @@
 package org.apache.kafka.streams.processor.internals.namedtopology;
 
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.TaskId;
 
 import java.util.Properties;
 
 public class NamedTopologyStreamsBuilder extends StreamsBuilder {
-    final String topologyName;
-
     /**
      * @param topologyName  any string representing your NamedTopology, all 
characters allowed except for "__"
      * @throws IllegalArgumentException if the name contains the character 
sequence "__"
      */
     public NamedTopologyStreamsBuilder(final String topologyName) {
-        super();
-        this.topologyName = topologyName;
+        super(new NamedTopology(topologyName));

Review comment:
       Nice cleanup!

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -133,7 +135,9 @@
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    private StreamsConfig config = null;
+    private StreamsConfig applicationConfig = null;  // the global streams 
configs and default topology props
+    private Properties topologyProperties = null;    // this topology's config 
overrides

Review comment:
       I also feel these three var names are a bit confusing. 
`topologyOverrides` reads better.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
##########
@@ -101,7 +101,7 @@ void removeRevokedUnknownTasks(final Set<TaskId> 
assignedTasks) {
                 final ProcessorStateManager stateManager = new 
ProcessorStateManager(
                     taskId,
                     Task.TaskType.STANDBY,
-                    StreamThread.eosEnabled(config),
+                    StreamThread.eosEnabled(applicationConfig),

Review comment:
       This is not for this PR, but originally the function 
`StreamThread.eosEnabled` is introduced in the `StreamThread` class as a 
quick-hack since adding it to `StreamsConfigs` would be a public API and 
requires a KIP. It is used in 1) streams-producer, 2) tasks, 3) in tests. Now 
that we have removed the usage in 2) with TaskConfigs (which is an internal 
class). I feel in the long run 1)'s use-case may also be replaced by another 
internal class, say ApplicationConfig or even TopologyConfig (if we can manage 
to have separate values of this within a single app in the future), and at that 
time we should migrate this function out of `StreamThread`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.namedtopology;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC;
+
+/**
+ * Streams configs that apply at the topology level. The values in the {@link 
StreamsConfig} parameter of the
+ * {@link org.apache.kafka.streams.KafkaStreams} or {@link 
KafkaStreamsNamedTopologyWrapper} constructors will
+ * determine the defaults, which can then be overridden for specific 
topologies by passing them in when creating the
+ * topology via the {@link 
org.apache.kafka.streams.StreamsBuilder#build(Properties)} or
+ * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods.
+ */
+public class TopologyConfig extends AbstractConfig {
+    private static final ConfigDef CONFIG;
+    static {
+        CONFIG = new ConfigDef()
+             .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+                     Type.INT,
+                     null,
+                     Importance.LOW,
+                     BUFFERED_RECORDS_PER_PARTITION_DOC)
+            .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+                    Type.CLASS,
+                    null,
+                    Importance.MEDIUM,
+                    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+             .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+                     Type.CLASS,
+                     null,
+                     Importance.MEDIUM,
+                     DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
+             .define(MAX_TASK_IDLE_MS_CONFIG,
+                     Type.LONG,
+                     null,
+                     Importance.MEDIUM,
+                     MAX_TASK_IDLE_MS_DOC)
+             .define(TASK_TIMEOUT_MS_CONFIG,
+                     Type.LONG,
+                     null,
+                     Importance.MEDIUM,
+                     TASK_TIMEOUT_MS_DOC);
+    }
+    private final Logger log = LoggerFactory.getLogger(TopologyConfig.class);
+
+    public final String topologyName;
+    public final boolean eosEnabled;
+
+    final long maxTaskIdleMs;
+    final long taskTimeoutMs;
+    final int maxBufferedSize;
+    final Supplier<TimestampExtractor> timestampExtractorSupplier;
+    final Supplier<DeserializationExceptionHandler> 
deserializationExceptionHandlerSupplier;
+
+    public TopologyConfig(final String topologyName, final StreamsConfig 
globalAppConfigs, final Properties topologyOverrides) {
+        super(CONFIG, topologyOverrides, false);
+
+        this.topologyName = topologyName;
+        this.eosEnabled = StreamThread.eosEnabled(globalAppConfigs);
+
+        if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
+            maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG);
+            log.info("Topology {} is overriding {} to {}", topologyName, 
MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
+        } else {
+            maxTaskIdleMs = globalAppConfigs.getLong(MAX_TASK_IDLE_MS_CONFIG);
+        }
+
+        if (isTopologyOverride(TASK_TIMEOUT_MS_CONFIG, topologyOverrides)) {
+            taskTimeoutMs = getLong(TASK_TIMEOUT_MS_CONFIG);
+            log.info("Topology {} is overriding {} to {}", topologyName, 
TASK_TIMEOUT_MS_CONFIG, taskTimeoutMs);
+        } else {
+            taskTimeoutMs = globalAppConfigs.getLong(TASK_TIMEOUT_MS_CONFIG);
+        }
+
+        if (isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, 
topologyOverrides)) {
+            maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+            log.info("Topology {} is overriding {} to {}", topologyName, 
BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
+        } else {
+            maxBufferedSize = 
globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+        }
+
+        if (isTopologyOverride(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
topologyOverrides)) {
+            timestampExtractorSupplier = () -> 
getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
TimestampExtractor.class);
+            log.info("Topology {} is overriding {} to {}", topologyName, 
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
getClass(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG));
+        } else {
+            timestampExtractorSupplier = () -> 
globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
 TimestampExtractor.class);
+        }
+
+        if 
(isTopologyOverride(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
topologyOverrides)) {
+            deserializationExceptionHandlerSupplier = () -> 
getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
DeserializationExceptionHandler.class);
+            log.info("Topology {} is overriding {} to {}", topologyName, 
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG));
+        } else {
+            deserializationExceptionHandlerSupplier = () -> 
globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 DeserializationExceptionHandler.class);
+
+        }
+    }
+
+    /**
+     * @return true if there is an override for this config in the properties 
of this NamedTopology. Applications that
+     *         don't use named topologies will just refer to the global 
defaults regardless of the topology properties
+     */
+    private boolean isTopologyOverride(final String config, final Properties 
topologyOverrides) {
+        return topologyName != null && topologyOverrides.containsKey(config);

Review comment:
       I think `topologyName == null` only when there's no named topology, 
which could still be quite common as people are not yet aware of the new APIs 
yet. In this case logging an warn may be surprising to them.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -359,15 +362,24 @@ public final InternalTopologyBuilder 
setApplicationId(final String applicationId
         return this;
     }
 
-    public synchronized final InternalTopologyBuilder setStreamsConfig(final 
StreamsConfig config) {
-        Objects.requireNonNull(config, "config can't be null");
-        this.config = config;
+    public synchronized final void setTopologyProperties(final Properties 
props) {
+        this.topologyProperties = props;
+    }
 
-        return this;
+    public synchronized final void setStreamsConfig(final StreamsConfig 
config) {

Review comment:
       This is a meta comment: I feel we can try to re-order the topology steps 
in a better way now. Today we have the following in order which looks awkward 
to me.
   
   1) InternalStreamsBuilder#internalTopologyBuilder(props): this is to do the 
logical plan -> physical plan build. The `props` should be a per-topology 
overrides in the long run.
   2) InternalStreamsBuilder#rewriteTopology(StreamsConfig config): this is to 
do some post-plan generation modifications which, ideally, should be part of 1) 
as well. Here the `config` is at global application-level.
   3) Inside 2) we call `setStreamsConfig(config);` which is to set the global 
application configs.
   
   I think a better order should be first setting the global configs, and then 
the per-topology overrides. So we should consider setting the global configs at 
the beginning when constructing the Topology (either from StreamsBuilder with 
DSL, or directly with PAPI), then the per-topology props for overrides.
   
   Of course the reason we have this awkwardness is because for PAPI, the 
Topology is built without "props" passed in at all and hence we can only rely 
on the last step 3) to make sure the configs are finally initialized. But now 
with the new `NamedTopology` we can bypass this by requiring it to always come 
with a prop at the construction time as well, which would be used to set both 
(the func names are just for illustrations):
   
   * internalTopologyBuilder.setTopologyOverrides(props);
   * internalTopologyBuilder.setApplicationConfigs(config);
   
   We would end up with the following scenarios:
   
   1. No named-topology, DSL: props are set at StreamsBuilder.build(), and we 
know there's no overrides.
   2. No named-topology, PAPI: the only pain left, as we have to wait at 
InternalTopologyBuilder.rewriteTopology, but still we know there's no overrides.
   3. Named-topology, DSL: props are set at constructor, for both 
application-level and overrides.
   4. Named-topology, PAPI: currently no one should be using in this way, but 
just in case it is supported it is still the same as 3) above.
   
   And if in the long run we would stick with the NamedTopology where we would 
deprecate 1/2 above, we would end up naturally at the state where we enforce 
props to be set at the topology construction time.
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to