guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682090255
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
##########
@@ -66,14 +72,26 @@
);
}
+ Map<TaskId, Set<TopicPartition>> uncreatedTasksForTopologies(final
Set<String> currentTopologies) {
+ return unknownTasksToBeCreated.entrySet().stream().filter(t ->
currentTopologies.contains(t.getKey().namedTopology())).collect(Collectors.toMap(Entry::getKey,
Entry::getValue));
Review comment:
Ditto here.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
private static final Pattern EMPTY_ZERO_LENGTH_PATTERN =
Pattern.compile("");
private final StreamsConfig config;
- private final SortedMap<String, InternalTopologyBuilder> builders; // Keep
sorted by topology name for readability
+ private final TopologyVersion version;
+
+ private final ConcurrentNavigableMap<String, InternalTopologyBuilder>
builders; // Keep sorted by topology name for readability
private ProcessorTopology globalTopology;
- private Map<String, StateStore> globalStateStores = new HashMap<>();
- final Set<String> allInputTopics = new HashSet<>();
+ private final Map<String, StateStore> globalStateStores = new HashMap<>();
+ private final Set<String> allInputTopics = new HashSet<>();
+
+ public static class TopologyVersion {
+ public AtomicLong topologyVersion = new AtomicLong(0L); // the local
topology version
+ public Set<String> assignedNamedTopologies = new HashSet<>(); // the
named topologies whose tasks are actively assigned
+ public ReentrantLock topologyLock = new ReentrantLock();
+ public Condition topologyCV = topologyLock.newCondition();
+ }
- public TopologyMetadata(final InternalTopologyBuilder builder, final
StreamsConfig config) {
+ public TopologyMetadata(final InternalTopologyBuilder builder,
+ final StreamsConfig config) {
+ version = new TopologyVersion();
this.config = config;
- builders = new TreeMap<>();
+ builders = new ConcurrentSkipListMap<>();
if (builder.hasNamedTopology()) {
builders.put(builder.topologyName(), builder);
} else {
builders.put(UNNAMED_TOPOLOGY, builder);
}
}
- public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder>
builders, final StreamsConfig config) {
+ public TopologyMetadata(final ConcurrentNavigableMap<String,
InternalTopologyBuilder> builders,
+ final StreamsConfig config) {
+ version = new TopologyVersion();
this.config = config;
+
this.builders = builders;
if (builders.isEmpty()) {
- log.debug("Building KafkaStreams app with no empty topology");
+ log.debug("Starting up empty KafkaStreams app with no topology");
}
}
- public int getNumStreamThreads(final StreamsConfig config) {
- final int configuredNumStreamThreads =
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+ public void updateCurrentAssignmentTopology(final Set<String>
assignedNamedTopologies) {
+ version.assignedNamedTopologies = assignedNamedTopologies;
+ }
- // If the application uses named topologies, it's possible to start up
with no topologies at all and only add them later
- if (builders.isEmpty()) {
- if (configuredNumStreamThreads != 0) {
- log.info("Overriding number of StreamThreads to zero for empty
topology");
+ /**
+ * @return the set of named topologies that the assignor distributed tasks
for during the last rebalance
+ */
+ public Set<String> assignmentNamedTopologies() {
+ return version.assignedNamedTopologies;
+ }
+
+ public long topologyVersion() {
+ return version.topologyVersion.get();
+ }
+
+ public void lock() {
+ version.topologyLock.lock();
+ }
+
+ public void unlock() {
+ version.topologyLock.unlock();
+ }
+
+ public InternalTopologyBuilder getBuilderForTopologyName(final String
name) {
+ return builders.get(name);
+ }
+
+ /**
+ * @throws IllegalStateException if the thread is not already holding the
lock via TopologyMetadata#lock
+ */
+ public void maybeWaitForNonEmptyTopology() {
+ if (!version.topologyLock.isHeldByCurrentThread()) {
Review comment:
I feel a bit concerned about the "asymmetry" of this function: all other
functions have the lock inside while this function is supposed to be called by
a caller -- i.e. `handleTopologyUpdatesPhase`. It is quite vulnerable to bugs
with additional edits.
I'm wondering if we can move this logic out of `handleTopologyUpdatesPhase`
instead: i.e. we first update the named topology, and then based on the new
version we can either wait or re-subscribe and trigger rebalance. WDYT?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
private static final Pattern EMPTY_ZERO_LENGTH_PATTERN =
Pattern.compile("");
private final StreamsConfig config;
- private final SortedMap<String, InternalTopologyBuilder> builders; // Keep
sorted by topology name for readability
+ private final TopologyVersion version;
+
+ private final ConcurrentNavigableMap<String, InternalTopologyBuilder>
builders; // Keep sorted by topology name for readability
private ProcessorTopology globalTopology;
- private Map<String, StateStore> globalStateStores = new HashMap<>();
- final Set<String> allInputTopics = new HashSet<>();
+ private final Map<String, StateStore> globalStateStores = new HashMap<>();
+ private final Set<String> allInputTopics = new HashSet<>();
+
+ public static class TopologyVersion {
+ public AtomicLong topologyVersion = new AtomicLong(0L); // the local
topology version
+ public Set<String> assignedNamedTopologies = new HashSet<>(); // the
named topologies whose tasks are actively assigned
+ public ReentrantLock topologyLock = new ReentrantLock();
+ public Condition topologyCV = topologyLock.newCondition();
+ }
- public TopologyMetadata(final InternalTopologyBuilder builder, final
StreamsConfig config) {
+ public TopologyMetadata(final InternalTopologyBuilder builder,
+ final StreamsConfig config) {
+ version = new TopologyVersion();
this.config = config;
- builders = new TreeMap<>();
+ builders = new ConcurrentSkipListMap<>();
if (builder.hasNamedTopology()) {
builders.put(builder.topologyName(), builder);
} else {
builders.put(UNNAMED_TOPOLOGY, builder);
}
}
- public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder>
builders, final StreamsConfig config) {
+ public TopologyMetadata(final ConcurrentNavigableMap<String,
InternalTopologyBuilder> builders,
+ final StreamsConfig config) {
+ version = new TopologyVersion();
this.config = config;
+
this.builders = builders;
if (builders.isEmpty()) {
- log.debug("Building KafkaStreams app with no empty topology");
+ log.debug("Starting up empty KafkaStreams app with no topology");
}
}
- public int getNumStreamThreads(final StreamsConfig config) {
- final int configuredNumStreamThreads =
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+ public void updateCurrentAssignmentTopology(final Set<String>
assignedNamedTopologies) {
+ version.assignedNamedTopologies = assignedNamedTopologies;
+ }
- // If the application uses named topologies, it's possible to start up
with no topologies at all and only add them later
- if (builders.isEmpty()) {
- if (configuredNumStreamThreads != 0) {
- log.info("Overriding number of StreamThreads to zero for empty
topology");
+ /**
+ * @return the set of named topologies that the assignor distributed tasks
for during the last rebalance
+ */
+ public Set<String> assignmentNamedTopologies() {
+ return version.assignedNamedTopologies;
+ }
+
+ public long topologyVersion() {
+ return version.topologyVersion.get();
+ }
+
+ public void lock() {
+ version.topologyLock.lock();
+ }
+
+ public void unlock() {
+ version.topologyLock.unlock();
+ }
+
+ public InternalTopologyBuilder getBuilderForTopologyName(final String
name) {
+ return builders.get(name);
+ }
+
+ /**
+ * @throws IllegalStateException if the thread is not already holding the
lock via TopologyMetadata#lock
+ */
+ public void maybeWaitForNonEmptyTopology() {
+ if (!version.topologyLock.isHeldByCurrentThread()) {
+ throw new IllegalStateException("Must call lock() before
attempting to wait on non-empty topology");
+ }
+ while (isEmpty()) {
+ try {
+ log.debug("Detected that the topology is currently empty,
going to wait for something to be added");
+ version.topologyCV.await();
+ } catch (final InterruptedException e) {
+ log.debug("StreamThread was interrupted while waiting on empty
topology", e);
+ }
+ }
+ }
+
+ public void registerAndBuildNewTopology(final InternalTopologyBuilder
newTopologyBuilder) {
+ try {
+ lock();
+ version.topologyVersion.incrementAndGet();
+ log.info("Adding NamedTopology {}, latest topology version is {}",
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+ builders.put(newTopologyBuilder.topologyName(),
newTopologyBuilder);
+ buildAndVerifyTopology(newTopologyBuilder);
+ version.topologyCV.signalAll();
+ } finally {
+ unlock();
+ }
+ }
+
+ /**
+ * Removes the topology and blocks until all threads on the older version
have ack'ed this removal.
+ * IT is guaranteed that no more tasks from this removed topology will be
processed
+ */
+ public void unregisterTopology(final String topologyName) {
+ try {
+ lock();
+ version.topologyVersion.incrementAndGet();
+ log.info("Removing NamedTopology {}, latest topology version is
{}", topologyName, version.topologyVersion.get());
+ final InternalTopologyBuilder removedBuilder =
builders.remove(topologyName);
+
removedBuilder.fullSourceTopicNames().forEach(allInputTopics::remove);
+
removedBuilder.allSourcePatternStrings().forEach(allInputTopics::remove);
+ version.topologyCV.signalAll();
+ } finally {
+ unlock();
+ }
+ }
+
+ public void buildAndRewriteTopology() {
+ applyToEachBuilder(this::buildAndVerifyTopology);
+ }
+
+ private void buildAndVerifyTopology(final InternalTopologyBuilder builder)
{
+ builder.rewriteTopology(config);
+ builder.buildTopology();
+
+ // As we go, check each topology for overlap in the set of input
topics/patterns
+ final int numInputTopics = allInputTopics.size();
+ final List<String> inputTopics = builder.fullSourceTopicNames();
+ final Collection<String> inputPatterns =
builder.allSourcePatternStrings();
+
+ final int numNewInputTopics = inputTopics.size() +
inputPatterns.size();
+ allInputTopics.addAll(inputTopics);
+ allInputTopics.addAll(inputPatterns);
+ if (allInputTopics.size() != numInputTopics + numNewInputTopics) {
+ inputTopics.retainAll(allInputTopics);
+ inputPatterns.retainAll(allInputTopics);
+ inputTopics.addAll(inputPatterns);
+ log.error("Tried to add the NamedTopology {} but it had overlap
with other input topics: {}", builder.topologyName(), inputTopics);
Review comment:
Could we print the `inputTopics` and `inputPatterns` as well in the
message?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -105,31 +105,76 @@ private KafkaStreamsNamedTopologyWrapper(final
Collection<NamedTopology> topolog
(v1, v2) -> {
throw new IllegalArgumentException("Topology names
must be unique");
},
- () -> new TreeMap<>())),
+ () -> new ConcurrentSkipListMap<>())),
config),
config,
clientSupplier
);
- for (final NamedTopology topology : topologies) {
- nameToTopology.put(topology.name(), topology);
- }
}
- public NamedTopology getTopologyByName(final String name) {
- if (nameToTopology.containsKey(name)) {
- return nameToTopology.get(name);
- } else {
- throw new IllegalArgumentException("Unable to locate a
NamedTopology called " + name);
+ /**
+ * @return the NamedTopology for the specific name, or Optional.empty() if
the application has no NamedTopology of that name
+ */
+ public Optional<NamedTopology> getTopologyByName(final String name) {
+ return
Optional.ofNullable(topologyMetadata.getBuilderForTopologyName(name)).map(InternalTopologyBuilder::namedTopology);
+ }
+
+ /**
+ * Add a new NamedTopology to a running Kafka Streams app. If multiple
instances of the application are running,
+ * you should inform all of them by calling {@link
#addNamedTopology(NamedTopology)} on each client in order for
+ * it to begin processing the new topology.
+ *
+ * @throws IllegalArgumentException if this topology name is already in use
+ * @throws IllegalStateException if streams has not been started or has
already shut down
+ * @throws TopologyException if this topology subscribes to any
input topics or pattern already in use
+ */
+ public void addNamedTopology(final NamedTopology newTopology) {
+ if (hasStartedOrFinishedShuttingDown()) {
+ throw new IllegalStateException("Cannot add a NamedTopology while
the state is " + super.state);
+ } else if (getTopologyByName(newTopology.name()).isPresent()) {
+ throw new IllegalArgumentException("Unable to add the new
NamedTopology " + newTopology.name() +
+ " as another of the same
name already exists");
}
+
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
}
- public void addNamedTopology(final NamedTopology topology) {
- nameToTopology.put(topology.name(), topology);
- throw new UnsupportedOperationException();
+ /**
+ * Remove an existing NamedTopology from a running Kafka Streams app. If
multiple instances of the application are
+ * running, you should inform all of them by calling {@link
#removeNamedTopology(String)} on each client to ensure
+ * it stops processing the old topology.
+ *
+ * @throws IllegalArgumentException if this topology name cannot be found
+ * @throws IllegalStateException if streams has not been started or has
already shut down
+ * @throws TopologyException if this topology subscribes to any
input topics or pattern already in use
+ */
+ public void removeNamedTopology(final String topologyToRemove) {
+ if (!isRunningOrRebalancing()) {
+ throw new IllegalStateException("Cannot remove a NamedTopology
while the state is " + super.state);
+ } else if (!getTopologyByName(topologyToRemove).isPresent()) {
+ throw new IllegalArgumentException("Unable to locate for removal a
NamedTopology called " + topologyToRemove);
+ }
+
+ topologyMetadata.unregisterTopology(topologyToRemove);
}
- public void removeNamedTopology(final String namedTopology) {
- throw new UnsupportedOperationException();
+ /**
+ * Do a clean up of the local state directory for this NamedTopology by
deleting all data with regard to the
+ * @link StreamsConfig#APPLICATION_ID_CONFIG application ID} in the
({@link StreamsConfig#STATE_DIR_CONFIG})
+ * <p>
+ * May be called while the Streams is in any state, but only on a {@link
NamedTopology} that has already been
+ * removed via {@link #removeNamedTopology(String)}.
+ * <p>
+ * Calling this method triggers a restore of local {@link StateStore}s for
this {@link NamedTopology} if it is
+ * ever re-added via {@link #addNamedTopology(NamedTopology)}.
+ *
+ * @throws IllegalStateException if this {@code NamedTopology} hasn't been
removed
+ * @throws StreamsException if cleanup failed
+ */
+ public void cleanUpNamedTopology(final String name) {
+ if (getTopologyByName(name).isPresent()) {
+ throw new IllegalStateException("Can't clean up local state for an
active NamedTopology");
Review comment:
Also I checked on the ksql side and found that we first call
`metadata.remove(queryId.toString());` and then this function, in this case
would `getTopologyByName` always return nothing? cc @wcarlson5
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -132,18 +136,36 @@ StreamsProducer threadProducer() {
return threadProducer;
}
+ Map<TaskId, Set<TopicPartition>> uncreatedTasksForTopologies(final
Set<String> currentTopologies) {
+ return unknownTasksToBeCreated.entrySet().stream().filter(t ->
currentTopologies.contains(t.getKey().namedTopology())).collect(Collectors.toMap(Entry::getKey,
Entry::getValue));
Review comment:
Use the newly added `Utils.filterMap`?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
private static final Pattern EMPTY_ZERO_LENGTH_PATTERN =
Pattern.compile("");
private final StreamsConfig config;
- private final SortedMap<String, InternalTopologyBuilder> builders; // Keep
sorted by topology name for readability
+ private final TopologyVersion version;
+
+ private final ConcurrentNavigableMap<String, InternalTopologyBuilder>
builders; // Keep sorted by topology name for readability
Review comment:
How about moving this into the TopologyVersion to be guarded within the
same lock as well, instead of making itself a concurrent data structure?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
private static final Pattern EMPTY_ZERO_LENGTH_PATTERN =
Pattern.compile("");
private final StreamsConfig config;
- private final SortedMap<String, InternalTopologyBuilder> builders; // Keep
sorted by topology name for readability
+ private final TopologyVersion version;
+
+ private final ConcurrentNavigableMap<String, InternalTopologyBuilder>
builders; // Keep sorted by topology name for readability
private ProcessorTopology globalTopology;
- private Map<String, StateStore> globalStateStores = new HashMap<>();
- final Set<String> allInputTopics = new HashSet<>();
+ private final Map<String, StateStore> globalStateStores = new HashMap<>();
+ private final Set<String> allInputTopics = new HashSet<>();
+
+ public static class TopologyVersion {
+ public AtomicLong topologyVersion = new AtomicLong(0L); // the local
topology version
+ public Set<String> assignedNamedTopologies = new HashSet<>(); // the
named topologies whose tasks are actively assigned
+ public ReentrantLock topologyLock = new ReentrantLock();
+ public Condition topologyCV = topologyLock.newCondition();
+ }
- public TopologyMetadata(final InternalTopologyBuilder builder, final
StreamsConfig config) {
+ public TopologyMetadata(final InternalTopologyBuilder builder,
+ final StreamsConfig config) {
+ version = new TopologyVersion();
this.config = config;
- builders = new TreeMap<>();
+ builders = new ConcurrentSkipListMap<>();
if (builder.hasNamedTopology()) {
builders.put(builder.topologyName(), builder);
} else {
builders.put(UNNAMED_TOPOLOGY, builder);
}
}
- public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder>
builders, final StreamsConfig config) {
+ public TopologyMetadata(final ConcurrentNavigableMap<String,
InternalTopologyBuilder> builders,
+ final StreamsConfig config) {
+ version = new TopologyVersion();
this.config = config;
+
this.builders = builders;
if (builders.isEmpty()) {
- log.debug("Building KafkaStreams app with no empty topology");
+ log.debug("Starting up empty KafkaStreams app with no topology");
}
}
- public int getNumStreamThreads(final StreamsConfig config) {
- final int configuredNumStreamThreads =
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+ public void updateCurrentAssignmentTopology(final Set<String>
assignedNamedTopologies) {
+ version.assignedNamedTopologies = assignedNamedTopologies;
+ }
- // If the application uses named topologies, it's possible to start up
with no topologies at all and only add them later
- if (builders.isEmpty()) {
- if (configuredNumStreamThreads != 0) {
- log.info("Overriding number of StreamThreads to zero for empty
topology");
+ /**
+ * @return the set of named topologies that the assignor distributed tasks
for during the last rebalance
+ */
+ public Set<String> assignmentNamedTopologies() {
+ return version.assignedNamedTopologies;
+ }
+
+ public long topologyVersion() {
+ return version.topologyVersion.get();
+ }
+
+ public void lock() {
Review comment:
This seems only used for the `updatePhase` outside the class itself -- I
think we should better just still keep it only used within a single function of
the TopologyMetadata. See my other related comments.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -105,31 +105,76 @@ private KafkaStreamsNamedTopologyWrapper(final
Collection<NamedTopology> topolog
(v1, v2) -> {
throw new IllegalArgumentException("Topology names
must be unique");
},
- () -> new TreeMap<>())),
+ () -> new ConcurrentSkipListMap<>())),
config),
config,
clientSupplier
);
- for (final NamedTopology topology : topologies) {
- nameToTopology.put(topology.name(), topology);
- }
}
- public NamedTopology getTopologyByName(final String name) {
- if (nameToTopology.containsKey(name)) {
- return nameToTopology.get(name);
- } else {
- throw new IllegalArgumentException("Unable to locate a
NamedTopology called " + name);
+ /**
+ * @return the NamedTopology for the specific name, or Optional.empty() if
the application has no NamedTopology of that name
+ */
+ public Optional<NamedTopology> getTopologyByName(final String name) {
+ return
Optional.ofNullable(topologyMetadata.getBuilderForTopologyName(name)).map(InternalTopologyBuilder::namedTopology);
+ }
+
+ /**
+ * Add a new NamedTopology to a running Kafka Streams app. If multiple
instances of the application are running,
+ * you should inform all of them by calling {@link
#addNamedTopology(NamedTopology)} on each client in order for
+ * it to begin processing the new topology.
+ *
+ * @throws IllegalArgumentException if this topology name is already in use
+ * @throws IllegalStateException if streams has not been started or has
already shut down
+ * @throws TopologyException if this topology subscribes to any
input topics or pattern already in use
+ */
+ public void addNamedTopology(final NamedTopology newTopology) {
+ if (hasStartedOrFinishedShuttingDown()) {
+ throw new IllegalStateException("Cannot add a NamedTopology while
the state is " + super.state);
+ } else if (getTopologyByName(newTopology.name()).isPresent()) {
+ throw new IllegalArgumentException("Unable to add the new
NamedTopology " + newTopology.name() +
+ " as another of the same
name already exists");
}
+
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
}
- public void addNamedTopology(final NamedTopology topology) {
- nameToTopology.put(topology.name(), topology);
- throw new UnsupportedOperationException();
+ /**
+ * Remove an existing NamedTopology from a running Kafka Streams app. If
multiple instances of the application are
+ * running, you should inform all of them by calling {@link
#removeNamedTopology(String)} on each client to ensure
+ * it stops processing the old topology.
+ *
+ * @throws IllegalArgumentException if this topology name cannot be found
+ * @throws IllegalStateException if streams has not been started or has
already shut down
+ * @throws TopologyException if this topology subscribes to any
input topics or pattern already in use
+ */
+ public void removeNamedTopology(final String topologyToRemove) {
+ if (!isRunningOrRebalancing()) {
+ throw new IllegalStateException("Cannot remove a NamedTopology
while the state is " + super.state);
+ } else if (!getTopologyByName(topologyToRemove).isPresent()) {
+ throw new IllegalArgumentException("Unable to locate for removal a
NamedTopology called " + topologyToRemove);
+ }
+
+ topologyMetadata.unregisterTopology(topologyToRemove);
}
- public void removeNamedTopology(final String namedTopology) {
- throw new UnsupportedOperationException();
+ /**
+ * Do a clean up of the local state directory for this NamedTopology by
deleting all data with regard to the
+ * @link StreamsConfig#APPLICATION_ID_CONFIG application ID} in the
({@link StreamsConfig#STATE_DIR_CONFIG})
+ * <p>
+ * May be called while the Streams is in any state, but only on a {@link
NamedTopology} that has already been
+ * removed via {@link #removeNamedTopology(String)}.
+ * <p>
+ * Calling this method triggers a restore of local {@link StateStore}s for
this {@link NamedTopology} if it is
+ * ever re-added via {@link #addNamedTopology(NamedTopology)}.
+ *
+ * @throws IllegalStateException if this {@code NamedTopology} hasn't been
removed
+ * @throws StreamsException if cleanup failed
+ */
+ public void cleanUpNamedTopology(final String name) {
+ if (getTopologyByName(name).isPresent()) {
+ throw new IllegalStateException("Can't clean up local state for an
active NamedTopology");
Review comment:
".. since the the topology name {} cannot be recognized"
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -327,6 +330,21 @@ public long lagFor(final TaskId task) {
return totalLag;
}
+ /**
+ * @return the previous tasks assigned to this consumer ordered by lag,
filtered for any tasks that don't exist in this assignment
+ */
+ public SortedSet<TaskId> prevTasksByLag(final String consumer) {
+ final SortedSet<TaskId> prevTasksByLag = new
TreeSet<>(comparingLong(this::lagFor).thenComparing(TaskId::compareTo));
+ for (final TaskId task : prevOwnedStatefulTasksByConsumer(consumer)) {
+ if (taskLagTotals.containsKey(task)) {
+ prevTasksByLag.add(task);
+ } else {
+ LOG.debug("Skipping previous task{} since it's not part of the
current assignment", task);
Review comment:
`task {}`.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -64,6 +65,9 @@
private final Map<TaskId, StreamsProducer> taskProducers;
private final StreamThread.ProcessingMode processingMode;
+ // tasks may be assigned for a NamedTopology that is not yet known by this
host, and saved for later creation
+ private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated =
new HashMap<>();
Review comment:
nit: extra space.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
private static final Pattern EMPTY_ZERO_LENGTH_PATTERN =
Pattern.compile("");
private final StreamsConfig config;
- private final SortedMap<String, InternalTopologyBuilder> builders; // Keep
sorted by topology name for readability
+ private final TopologyVersion version;
+
+ private final ConcurrentNavigableMap<String, InternalTopologyBuilder>
builders; // Keep sorted by topology name for readability
private ProcessorTopology globalTopology;
- private Map<String, StateStore> globalStateStores = new HashMap<>();
- final Set<String> allInputTopics = new HashSet<>();
+ private final Map<String, StateStore> globalStateStores = new HashMap<>();
+ private final Set<String> allInputTopics = new HashSet<>();
+
+ public static class TopologyVersion {
+ public AtomicLong topologyVersion = new AtomicLong(0L); // the local
topology version
+ public Set<String> assignedNamedTopologies = new HashSet<>(); // the
named topologies whose tasks are actively assigned
+ public ReentrantLock topologyLock = new ReentrantLock();
+ public Condition topologyCV = topologyLock.newCondition();
+ }
- public TopologyMetadata(final InternalTopologyBuilder builder, final
StreamsConfig config) {
+ public TopologyMetadata(final InternalTopologyBuilder builder,
+ final StreamsConfig config) {
+ version = new TopologyVersion();
this.config = config;
- builders = new TreeMap<>();
+ builders = new ConcurrentSkipListMap<>();
if (builder.hasNamedTopology()) {
builders.put(builder.topologyName(), builder);
} else {
builders.put(UNNAMED_TOPOLOGY, builder);
}
}
- public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder>
builders, final StreamsConfig config) {
+ public TopologyMetadata(final ConcurrentNavigableMap<String,
InternalTopologyBuilder> builders,
+ final StreamsConfig config) {
+ version = new TopologyVersion();
this.config = config;
+
this.builders = builders;
if (builders.isEmpty()) {
- log.debug("Building KafkaStreams app with no empty topology");
+ log.debug("Starting up empty KafkaStreams app with no topology");
}
}
- public int getNumStreamThreads(final StreamsConfig config) {
- final int configuredNumStreamThreads =
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+ public void updateCurrentAssignmentTopology(final Set<String>
assignedNamedTopologies) {
+ version.assignedNamedTopologies = assignedNamedTopologies;
+ }
- // If the application uses named topologies, it's possible to start up
with no topologies at all and only add them later
- if (builders.isEmpty()) {
- if (configuredNumStreamThreads != 0) {
- log.info("Overriding number of StreamThreads to zero for empty
topology");
+ /**
+ * @return the set of named topologies that the assignor distributed tasks
for during the last rebalance
+ */
+ public Set<String> assignmentNamedTopologies() {
+ return version.assignedNamedTopologies;
+ }
+
+ public long topologyVersion() {
+ return version.topologyVersion.get();
+ }
+
+ public void lock() {
+ version.topologyLock.lock();
+ }
+
+ public void unlock() {
+ version.topologyLock.unlock();
+ }
+
+ public InternalTopologyBuilder getBuilderForTopologyName(final String
name) {
+ return builders.get(name);
+ }
+
+ /**
+ * @throws IllegalStateException if the thread is not already holding the
lock via TopologyMetadata#lock
+ */
+ public void maybeWaitForNonEmptyTopology() {
+ if (!version.topologyLock.isHeldByCurrentThread()) {
+ throw new IllegalStateException("Must call lock() before
attempting to wait on non-empty topology");
+ }
+ while (isEmpty()) {
+ try {
+ log.debug("Detected that the topology is currently empty,
going to wait for something to be added");
+ version.topologyCV.await();
+ } catch (final InterruptedException e) {
+ log.debug("StreamThread was interrupted while waiting on empty
topology", e);
+ }
+ }
+ }
+
+ public void registerAndBuildNewTopology(final InternalTopologyBuilder
newTopologyBuilder) {
+ try {
+ lock();
+ version.topologyVersion.incrementAndGet();
+ log.info("Adding NamedTopology {}, latest topology version is {}",
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+ builders.put(newTopologyBuilder.topologyName(),
newTopologyBuilder);
+ buildAndVerifyTopology(newTopologyBuilder);
+ version.topologyCV.signalAll();
+ } finally {
+ unlock();
+ }
+ }
+
+ /**
+ * Removes the topology and blocks until all threads on the older version
have ack'ed this removal.
+ * IT is guaranteed that no more tasks from this removed topology will be
processed
+ */
+ public void unregisterTopology(final String topologyName) {
+ try {
+ lock();
+ version.topologyVersion.incrementAndGet();
+ log.info("Removing NamedTopology {}, latest topology version is
{}", topologyName, version.topologyVersion.get());
+ final InternalTopologyBuilder removedBuilder =
builders.remove(topologyName);
+
removedBuilder.fullSourceTopicNames().forEach(allInputTopics::remove);
+
removedBuilder.allSourcePatternStrings().forEach(allInputTopics::remove);
+ version.topologyCV.signalAll();
+ } finally {
+ unlock();
+ }
+ }
+
+ public void buildAndRewriteTopology() {
+ applyToEachBuilder(this::buildAndVerifyTopology);
+ }
+
+ private void buildAndVerifyTopology(final InternalTopologyBuilder builder)
{
+ builder.rewriteTopology(config);
+ builder.buildTopology();
+
+ // As we go, check each topology for overlap in the set of input
topics/patterns
+ final int numInputTopics = allInputTopics.size();
+ final List<String> inputTopics = builder.fullSourceTopicNames();
+ final Collection<String> inputPatterns =
builder.allSourcePatternStrings();
+
+ final int numNewInputTopics = inputTopics.size() +
inputPatterns.size();
+ allInputTopics.addAll(inputTopics);
+ allInputTopics.addAll(inputPatterns);
+ if (allInputTopics.size() != numInputTopics + numNewInputTopics) {
+ inputTopics.retainAll(allInputTopics);
+ inputPatterns.retainAll(allInputTopics);
+ inputTopics.addAll(inputPatterns);
+ log.error("Tried to add the NamedTopology {} but it had overlap
with other input topics: {}", builder.topologyName(), inputTopics);
+ throw new TopologyException("Named Topologies may not subscribe to
the same input topics or patterns");
+ }
+
+ final ProcessorTopology globalTopology =
builder.buildGlobalStateTopology();
+ if (globalTopology != null) {
+ if (builder.topologyName() != null) {
+ throw new IllegalStateException("Global state stores are not
supported with Named Topologies");
+ } else if (this.globalTopology == null) {
+ this.globalTopology = globalTopology;
Review comment:
Just to clarify: we either support N topologies where none of them have
global topologies, or we support just one topology with only global topology?
What's the rationale for supporting the second scenario?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -64,6 +65,9 @@
private final Map<TaskId, StreamsProducer> taskProducers;
private final StreamThread.ProcessingMode processingMode;
+ // tasks may be assigned for a NamedTopology that is not yet known by this
host, and saved for later creation
+ private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated =
new HashMap<>();
Review comment:
Also I'm wondering how that would be possible: if the thread/consumer
has not updated their subscriptions, the leader should not assign any of the
tasks of the newly added topics to it right?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -53,44 +56,162 @@
private static final Pattern EMPTY_ZERO_LENGTH_PATTERN =
Pattern.compile("");
private final StreamsConfig config;
- private final SortedMap<String, InternalTopologyBuilder> builders; // Keep
sorted by topology name for readability
+ private final TopologyVersion version;
+
+ private final ConcurrentNavigableMap<String, InternalTopologyBuilder>
builders; // Keep sorted by topology name for readability
Review comment:
Also we can optimize the condition on whether we should enforce a new
rebalance per-thread in `StreamThread` as:
`builders.keySet().equals(assignedNamedTopologies)`. The former is the set of
topologies this instance knows, the latter is the set of topologies the leader
knows, when they are the same there's no need to trigger rebalance.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -248,12 +249,29 @@ public ByteBuffer subscriptionUserData(final Set<String>
topics) {
handleRebalanceStart(topics);
uniqueField++;
+ final Set<String> currentNamedTopologies;
+ final Map<TaskId, Long> taskOffsetSums;
+ try {
+ taskManager.topologyMetadata().lock();
Review comment:
Similar to my other comment: I think we can just add a single function
of `topologyMetadata` which returns is synchronized on the lock, instead of
relying on the caller to grab lock? More specifically, the
`topologyMetadata().hasNamedTopologies()` can be replaced with the
`!currentNamedTopologies.isEmpty()` and as long as `namedTopologiesView()` is
synchronized that should be sufficient?
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -67,13 +72,31 @@
public class NamedTopologyIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
+ // TODO KAFKA-12648:
Review comment:
This is meta question: do we have coverage on scenarios where the
leader/member's bookkept named-topologies set are different? I.e. 1) the leader
would not try to create any tasks that it's own topology-metadata is not aware
of even if other subscriptions contain more topics, 2) vice verse, the other
members would not try to create tasks for assignment that its topology metadata
does not recognize, while later when they get added the tasks gets created then?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -867,6 +873,36 @@ private void initializeAndRestorePhase() {
log.debug("Idempotent restore call done. Thread state has not
changed.");
}
+ private void handleTopologyUpdatesPhase() {
+ // Check if the topology has been updated since we last checked, ie
via #addNamedTopology or #removeNamedTopology
+ // or if this is the very first topology in which case we may need to
wait for it to be non-empty
+ if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() ||
lastSeenTopologyVersion == 0) {
+ try {
Review comment:
See my other comment: I'm wondering if we move
`topologyMetadata.maybeWaitForNonEmptyTopology();` out of this function do we
still need to grab the lock of topologyMetadata? Seems we can wrap the
remaining getters into a single call. I.e.:
1. Update the topology version if possible (only need a getter to the
topologyMetadata). Returns a boolean indicating if the topology has changed.
2. Check on the current topology, if it is empty first `handleEmptyTopology`
and then `maybeWaitForNonEmptyTopology`.
3. Otherwise of 2), check whether 1) returned true, if yes call
`taskManager.maybeCreateTasksFromNewTopologies` (btw I'm not sure if the
topology has changed, i.e. the consumer subscription have not been updated, if
this thread/consumer could ever get tasks of those unknown-yet topologies? I
think the leader would not assign tasks to those who do not subscribe to yet);
and also subscribe consumer and enforce rebalance.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -935,6 +944,14 @@ void shutdown(final boolean clean) {
return tasksToCloseDirty;
}
+ public void updateCurrentAssignmentTopology(final Set<String>
assignedNamedTopologies) {
+
topologyMetadata.updateCurrentAssignmentTopology(assignedNamedTopologies);
Review comment:
The propagated `assignedNamedTopologies` is not used yet anywhere in the
KS code. Could you elaborate a bit how it would be used in the future?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]