guozhangwang commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r700479414
########## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ########## @@ -611,6 +611,7 @@ public synchronized Topology build() { */ public synchronized Topology build(final Properties props) { internalStreamsBuilder.buildAndOptimizeTopology(props); + internalTopologyBuilder.setTopologyProperties(props); Review comment: nit: add a TODO here that for now we always set the overrides as the same as global application props? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyStreamsBuilder.java ########## @@ -17,35 +17,24 @@ package org.apache.kafka.streams.processor.internals.namedtopology; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.TaskId; import java.util.Properties; public class NamedTopologyStreamsBuilder extends StreamsBuilder { - final String topologyName; - /** * @param topologyName any string representing your NamedTopology, all characters allowed except for "__" * @throws IllegalArgumentException if the name contains the character sequence "__" */ public NamedTopologyStreamsBuilder(final String topologyName) { - super(); - this.topologyName = topologyName; + super(new NamedTopology(topologyName)); Review comment: Nice cleanup! ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ########## @@ -133,7 +135,9 @@ private Map<Integer, Set<String>> nodeGroups = null; - private StreamsConfig config = null; + private StreamsConfig applicationConfig = null; // the global streams configs and default topology props + private Properties topologyProperties = null; // this topology's config overrides Review comment: I also feel these three var names are a bit confusing. `topologyOverrides` reads better. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java ########## @@ -101,7 +101,7 @@ void removeRevokedUnknownTasks(final Set<TaskId> assignedTasks) { final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, Task.TaskType.STANDBY, - StreamThread.eosEnabled(config), + StreamThread.eosEnabled(applicationConfig), Review comment: This is not for this PR, but originally the function `StreamThread.eosEnabled` is introduced in the `StreamThread` class as a quick-hack since adding it to `StreamsConfigs` would be a public API and requires a KIP. It is used in 1) streams-producer, 2) tasks, 3) in tests. Now that we have removed the usage in 2) with TaskConfigs (which is an internal class). I feel in the long run 1)'s use-case may also be replaced by another internal class, say ApplicationConfig or even TopologyConfig (if we can manage to have separate values of this within a single app in the future), and at that time we should migrate this function out of `StreamThread`. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.internals.StreamThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Properties; +import java.util.function.Supplier; + +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC; + +/** + * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the + * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will + * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the + * topology via the {@link org.apache.kafka.streams.StreamsBuilder#build(Properties)} or + * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods. + */ +public class TopologyConfig extends AbstractConfig { + private static final ConfigDef CONFIG; + static { + CONFIG = new ConfigDef() + .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, + Type.INT, + null, + Importance.LOW, + BUFFERED_RECORDS_PER_PARTITION_DOC) + .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + null, + Importance.MEDIUM, + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + Type.CLASS, + null, + Importance.MEDIUM, + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(MAX_TASK_IDLE_MS_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + MAX_TASK_IDLE_MS_DOC) + .define(TASK_TIMEOUT_MS_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + TASK_TIMEOUT_MS_DOC); + } + private final Logger log = LoggerFactory.getLogger(TopologyConfig.class); + + public final String topologyName; + public final boolean eosEnabled; + + final long maxTaskIdleMs; + final long taskTimeoutMs; + final int maxBufferedSize; + final Supplier<TimestampExtractor> timestampExtractorSupplier; + final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier; + + public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) { + super(CONFIG, topologyOverrides, false); + + this.topologyName = topologyName; + this.eosEnabled = StreamThread.eosEnabled(globalAppConfigs); + + if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) { + maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG); + log.info("Topology {} is overriding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs); + } else { + maxTaskIdleMs = globalAppConfigs.getLong(MAX_TASK_IDLE_MS_CONFIG); + } + + if (isTopologyOverride(TASK_TIMEOUT_MS_CONFIG, topologyOverrides)) { + taskTimeoutMs = getLong(TASK_TIMEOUT_MS_CONFIG); + log.info("Topology {} is overriding {} to {}", topologyName, TASK_TIMEOUT_MS_CONFIG, taskTimeoutMs); + } else { + taskTimeoutMs = globalAppConfigs.getLong(TASK_TIMEOUT_MS_CONFIG); + } + + if (isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides)) { + maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); + log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); + } else { + maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); + } + + if (isTopologyOverride(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, topologyOverrides)) { + timestampExtractorSupplier = () -> getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, getClass(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG)); + } else { + timestampExtractorSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + } + + if (isTopologyOverride(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) { + deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)); + } else { + deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + + } + } + + /** + * @return true if there is an override for this config in the properties of this NamedTopology. Applications that + * don't use named topologies will just refer to the global defaults regardless of the topology properties + */ + private boolean isTopologyOverride(final String config, final Properties topologyOverrides) { + return topologyName != null && topologyOverrides.containsKey(config); Review comment: I think `topologyName == null` only when there's no named topology, which could still be quite common as people are not yet aware of the new APIs yet. In this case logging an warn may be surprising to them. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ########## @@ -359,15 +362,24 @@ public final InternalTopologyBuilder setApplicationId(final String applicationId return this; } - public synchronized final InternalTopologyBuilder setStreamsConfig(final StreamsConfig config) { - Objects.requireNonNull(config, "config can't be null"); - this.config = config; + public synchronized final void setTopologyProperties(final Properties props) { + this.topologyProperties = props; + } - return this; + public synchronized final void setStreamsConfig(final StreamsConfig config) { Review comment: This is a meta comment: I feel we can try to re-order the topology steps in a better way now. Today we have the following in order which looks awkward to me. 1) InternalStreamsBuilder#internalTopologyBuilder(props): this is to do the logical plan -> physical plan build. The `props` should be a per-topology overrides in the long run. 2) InternalStreamsBuilder#rewriteTopology(StreamsConfig config): this is to do some post-plan generation modifications which, ideally, should be part of 1) as well. Here the `config` is at global application-level. 3) Inside 2) we call `setStreamsConfig(config);` which is to set the global application configs. I think a better order should be first setting the global configs, and then the per-topology overrides. So we should consider setting the global configs at the beginning when constructing the Topology (either from StreamsBuilder with DSL, or directly with PAPI), then the per-topology props for overrides. Of course the reason we have this awkwardness is because for PAPI, the Topology is built without "props" passed in at all and hence we can only rely on the last step 3) to make sure the configs are finally initialized. But now with the new `NamedTopology` we can bypass this by requiring it to always come with a prop at the construction time as well, which would be used to set both (the func names are just for illustrations): * internalTopologyBuilder.setTopologyOverrides(props); * internalTopologyBuilder.setApplicationConfigs(config); We would end up with the following scenarios: 1. No named-topology, DSL: props are set at StreamsBuilder.build(), and we know there's no overrides. 2. No named-topology, PAPI: the only pain left, as we have to wait at InternalTopologyBuilder.rewriteTopology, but still we know there's no overrides. 3. Named-topology, DSL: props are set at constructor, for both application-level and overrides. 4. Named-topology, PAPI: currently no one should be using in this way, but just in case it is supported it is still the same as 3) above. And if in the long run we would stick with the NamedTopology where we would deprecate 1/2 above, we would end up naturally at the state where we enforce props to be set at the topology construction time. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org