ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669210507



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-    //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+    private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+    // the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+    private final StreamsConfig config;
+    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+
+    private ProcessorTopology globalTopology;
+    private Map<String, StateStore> globalStateStores = new HashMap<>();
+    final Set<String> allInputTopics = new HashSet<>();
+
+    public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+        this.config = config;
+        builders = new TreeMap<>();
+        if (builder.hasNamedTopology()) {
+            builders.put(builder.namedTopology(), builder);
+        } else {
+            builders.put(UNNAMED_TOPOLOGY, builder);
+        }
+    }
+
+    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> 
builders, final StreamsConfig config) {
+        this.config = config;
+        this.builders = builders;
+        if (builders.isEmpty()) {
+            log.debug("Building KafkaStreams app with no empty topology");
+        }
+    }
+
+    public int getNumStreamThreads(final StreamsConfig config) {
+        final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+        // If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+        if (builders.isEmpty()) {
+            if (configuredNumStreamThreads != 0) {
+                log.info("Overriding number of StreamThreads to zero for empty 
topology");
+            }
+            return 0;
+        }
+
+        // If there are topologies but they are all empty, this indicates a 
bug in user code
+        if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
       Sure, but if `builders.isEmpty` then we would enter the `if` block above 
and return before reaching this section of the code. But I think maybe you 
meant that in `hasNoNonGlobalTopology`, we should actually return true only if 
_all_ builders have no non-global topology, not if that's true for _any one_ of 
them? There's some argument to be made for how to handle the case where some 
named topologies are legit, while others are empty, but I would still advocate 
for throwing an exception when _any_ topology is empty since this is not a 
valid configuration. In which case, the current code is correct, but the 
comment is not. I'll fix the misleading comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to