ableegoldman commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r669196718



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual 
InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-    //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR 
#10683)
+    private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+    // the '_' character is not allowed for topology names, thus it's safe to 
use to indicate that it's not a named topology
+    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
+    private final StreamsConfig config;
+    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep 
sorted by topology name for readability
+
+    private ProcessorTopology globalTopology;
+    private Map<String, StateStore> globalStateStores = new HashMap<>();
+    final Set<String> allInputTopics = new HashSet<>();
+
+    public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+        this.config = config;
+        builders = new TreeMap<>();
+        if (builder.hasNamedTopology()) {
+            builders.put(builder.namedTopology(), builder);
+        } else {
+            builders.put(UNNAMED_TOPOLOGY, builder);
+        }
+    }
+
+    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> 
builders, final StreamsConfig config) {
+        this.config = config;
+        this.builders = builders;
+        if (builders.isEmpty()) {
+            log.debug("Building KafkaStreams app with no empty topology");
+        }
+    }
+
+    public int getNumStreamThreads(final StreamsConfig config) {
+        final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+        // If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
+        if (builders.isEmpty()) {
+            if (configuredNumStreamThreads != 0) {
+                log.info("Overriding number of StreamThreads to zero for empty 
topology");
+            }
+            return 0;
+        }
+
+        // If there are topologies but they are all empty, this indicates a 
bug in user code
+        if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {
+            log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+            throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+                                            "must subscribe to at least one 
source topic or global table.");
+        }
+
+        // Lastly we check for an empty non-global topology and override the 
threads to zero if set otherwise
+        if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) {
+            log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+            return 0;
+        }
+
+        return configuredNumStreamThreads;
+    }
+
+    public boolean hasNamedTopologies() {
+        // This includes the case of starting up with no named topologies at 
all
+        return !builders.containsKey(UNNAMED_TOPOLOGY);
+    }
+
+    public boolean hasGlobalTopology() {
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores);
+    }
+
+    public boolean hasNoNonGlobalTopology() {
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology);
+    }
+
+    public boolean hasPersistentStores() {
+        // If the app is using named topologies, there may not be any 
persistent state when it first starts up
+        // but a new NamedTopology may introduce it later, so we must return 
true
+        if (hasNamedTopologies()) {
+            return true;
+        }
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasPersistentStores);
+    }
+
+    public boolean hasStore(final String name) {
+        return evaluateConditionIsTrueForAnyBuilders(b -> b.hasStore(name));
+    }
+
+    public boolean hasOffsetResetOverrides() {
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasOffsetResetOverrides);
+    }
+
+    public OffsetResetStrategy offsetResetStrategy(final String topic) {
+        for (final InternalTopologyBuilder builder : builders.values()) {
+            final OffsetResetStrategy resetStrategy = 
builder.offsetResetStrategy(topic);
+            if (resetStrategy != null) {
+                return resetStrategy;
+            }
+        }
+        return null;
+    }
+
+    Collection<String> sourceTopicCollection() {
+        final List<String> sourceTopics = new ArrayList<>();
+        applyToEachBuilder(b -> 
sourceTopics.addAll(b.sourceTopicCollection()));
+        return sourceTopics;
+    }
+
+    Pattern sourceTopicPattern() {
+        final StringBuilder patternBuilder = new StringBuilder();
+
+        applyToEachBuilder(b -> {
+            final String patternString = b.sourceTopicsPatternString();
+            if (patternString.length() > 0) {
+                patternBuilder.append(patternString).append("|");
+            }
+        });
+
+        if (patternBuilder.length() > 0) {
+            patternBuilder.setLength(patternBuilder.length() - 1);
+            return Pattern.compile(patternBuilder.toString());
+        } else {
+            return EMPTY_ZERO_LENGTH_PATTERN;
+        }
+    }
+
+    public boolean usesPatternSubscription() {
+        return 
evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::usesPatternSubscription);
+    }
+
+    // Can be empty if app is started up with no Named Topologies, in order to 
add them on later
+    public boolean isEmpty() {
+        return builders.isEmpty();
+    }
+
+    public String topologyDescription() {
+        if (isEmpty()) {
+            return "";
+        }
+        final StringBuilder sb = new StringBuilder();
+
+        applyToEachBuilder(b -> {
+            sb.append(b.describe().toString());
+        });
+
+        return sb.toString();
+    }
+
+    public final void buildAndRewriteTopology() {
+        applyToEachBuilder(builder -> {
+            builder.rewriteTopology(config);
+            builder.buildTopology();
+
+            // As we go, check each topology for overlap in the set of input 
topics/patterns
+            final int numInputTopics = allInputTopics.size();
+            final List<String> inputTopics = builder.fullSourceTopicNames();
+            final Collection<String> inputPatterns = 
builder.allSourcePatternStrings();
+
+            final int numNewInputTopics = inputTopics.size() + 
inputPatterns.size();
+            allInputTopics.addAll(inputTopics);
+            allInputTopics.addAll(inputPatterns);
+            if (allInputTopics.size() != numInputTopics + numNewInputTopics) {
+                inputTopics.retainAll(allInputTopics);
+                inputPatterns.retainAll(allInputTopics);
+                inputTopics.addAll(inputPatterns);
+                log.error("Tried to add the NamedTopology {} but it had 
overlap with other input topics: {}", builder.namedTopology(), inputTopics);
+                throw new TopologyException("Named Topologies may not 
subscribe to the same input topics or patterns");
+            }
+
+            final ProcessorTopology globalTopology = 
builder.buildGlobalStateTopology();
+            if (globalTopology != null) {
+                if (builder.namedTopology() != null) {
+                    throw new IllegalStateException("Global state stores are 
not supported with Named Topologies");

Review comment:
       It's not likely to be compatible in the first phase, but I think we 
would want it to be fully supported in the end, ie as part of the KIP once we 
have everything worked out and feeling ready for that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to