wcarlson5 commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r697495900



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -67,7 +68,7 @@
             stateDirectory,
             stateMgr,
             inputPartitions,
-            config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG),

Review comment:
       Do we want to have the same pattern for task level configs as we do for 
streams configs?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -262,7 +262,7 @@ private StreamTask createActiveTask(final TaskId taskId,
             inputPartitions,
             topology,
             consumer,
-            config,
+            topologyMetadata.getTaskConfigFor(taskId),

Review comment:
       Maybe we take config as well for any overrides? I don't know what would 
take presidents. Or maybe check for conflicts?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -359,15 +362,24 @@ public final InternalTopologyBuilder 
setApplicationId(final String applicationId
         return this;
     }
 
-    public synchronized final InternalTopologyBuilder setStreamsConfig(final 
StreamsConfig config) {
-        Objects.requireNonNull(config, "config can't be null");
-        this.config = config;
+    public synchronized final void setTopologyProperties(final Properties 
props) {
+        this.topologyProperties = props;
+    }
 
-        return this;
+    public synchronized final void setStreamsConfig(final StreamsConfig 
config) {
+        Objects.requireNonNull(config, "config can't be null");
+        this.applicationConfig = config;
+        topologyConfigs = new TopologyConfig(
+            topologyName,
+            applicationConfig,
+            topologyProperties == null ?

Review comment:
       Do we use Optional in streams? this would be a good place for it

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -133,7 +135,9 @@
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    private StreamsConfig config = null;
+    private StreamsConfig applicationConfig = null;  // the global streams 
configs and default topology props
+    private Properties topologyProperties = null;    // this topology's config 
overrides

Review comment:
       I am not clear on the difference based on the names can we just name it 
`topologyOverrides`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to