ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682237513
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; - private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability + private final TopologyVersion version; + + private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability private ProcessorTopology globalTopology; - private Map<String, StateStore> globalStateStores = new HashMap<>(); - final Set<String> allInputTopics = new HashSet<>(); + private final Map<String, StateStore> globalStateStores = new HashMap<>(); + private final Set<String> allInputTopics = new HashSet<>(); + + public static class TopologyVersion { + public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version + public Set<String> assignedNamedTopologies = new HashSet<>(); // the named topologies whose tasks are actively assigned + public ReentrantLock topologyLock = new ReentrantLock(); + public Condition topologyCV = topologyLock.newCondition(); + } - public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { + public TopologyMetadata(final InternalTopologyBuilder builder, + final StreamsConfig config) { + version = new TopologyVersion(); this.config = config; - builders = new TreeMap<>(); + builders = new ConcurrentSkipListMap<>(); if (builder.hasNamedTopology()) { builders.put(builder.topologyName(), builder); } else { builders.put(UNNAMED_TOPOLOGY, builder); } } - public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> builders, final StreamsConfig config) { + public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders, + final StreamsConfig config) { + version = new TopologyVersion(); this.config = config; + this.builders = builders; if (builders.isEmpty()) { - log.debug("Building KafkaStreams app with no empty topology"); + log.debug("Starting up empty KafkaStreams app with no topology"); } } - public int getNumStreamThreads(final StreamsConfig config) { - final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + public void updateCurrentAssignmentTopology(final Set<String> assignedNamedTopologies) { + version.assignedNamedTopologies = assignedNamedTopologies; + } - // If the application uses named topologies, it's possible to start up with no topologies at all and only add them later - if (builders.isEmpty()) { - if (configuredNumStreamThreads != 0) { - log.info("Overriding number of StreamThreads to zero for empty topology"); + /** + * @return the set of named topologies that the assignor distributed tasks for during the last rebalance + */ + public Set<String> assignmentNamedTopologies() { + return version.assignedNamedTopologies; + } + + public long topologyVersion() { + return version.topologyVersion.get(); + } + + public void lock() { Review comment: This part of the code has been through a lot of refactoring and after the latest cleanup, I agree we should be able to avoid exposing it at all -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org