This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6f54fae KAFKA-12648: fix #add/removeNamedTopology blocking behavior
when app is in CREATED (#11813)
6f54fae is described below
commit 6f54faed2d0792f3a36534fd7e6d00b6603253a8
Author: A. Sophie Blee-Goldman
AuthorDate: Fri Mar 4 09:58:56 2022 -0800
KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in
CREATED (#11813)
Currently the #add/removeNamedTopology APIs behave a little wonky when the
application is still in CREATED. Since adding and removing topologies runs some
validation steps there is valid reason to want to add or remove a topology on a
dummy app that you don't plan to start, or a real app that you haven't started
yet. But to actually check the results of the validation you need to call get()
on the future, so we need to make sure that get() won't block forever in the
case of no failure [...]
Reviewers: Guozhang Wang , Walker Carlson
---
.../KafkaStreamsNamedTopologyWrapper.java | 111 +
.../processor/internals/NamedTopologyTest.java | 28 +-
2 files changed, 97 insertions(+), 42 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index fb005d1..6355cae 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -99,8 +99,10 @@ public class KafkaStreamsNamedTopologyWrapper extends
KafkaStreams {
/**
* Start up Streams with a collection of initial NamedTopologies (may be
empty)
+ *
+ * Note: this is synchronized to ensure that the application state cannot
change while we add topologies
*/
-public void start(final Collection initialTopologies) {
+public synchronized void start(final Collection
initialTopologies) {
log.info("Starting Streams with topologies: {}", initialTopologies);
for (final NamedTopology topology : initialTopologies) {
final AddNamedTopologyResult addNamedTopologyResult =
addNamedTopology(topology);
@@ -145,7 +147,7 @@ public class KafkaStreamsNamedTopologyWrapper extends
KafkaStreams {
/**
* @return the NamedTopology for the specific name, or Optional.empty() if
the application has no NamedTopology of that name
*/
-public Optional getTopologyByName(final String name) {
+public synchronized Optional getTopologyByName(final String
name) {
return
Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology);
}
@@ -180,7 +182,9 @@ public class KafkaStreamsNamedTopologyWrapper extends
KafkaStreams {
);
} else {
topologyMetadata.registerAndBuildNewTopology(future,
newTopology.internalTopologyBuilder());
+maybeCompleteFutureIfStillInCREATED(future, "adding topology " +
newTopology.name());
}
+
return new AddNamedTopologyResult(future);
}
@@ -205,7 +209,8 @@ public class KafkaStreamsNamedTopologyWrapper extends
KafkaStreams {
if (hasStartedOrFinishedShuttingDown()) {
log.error("Attempted to remove topology {} from while the Kafka
Streams was in state {}, "
- + "application must be started first.",
topologyToRemove, state
+ + "topologies cannot be modified if the application
has begun or completed shutting down.",
+ topologyToRemove, state
);
removeTopologyFuture.completeExceptionally(
new IllegalStateException("Cannot remove a NamedTopology while
the state is " + super.state)
@@ -218,6 +223,7 @@ public class KafkaStreamsNamedTopologyWrapper extends
KafkaStreams {
new UnknownTopologyException("Unable to remove topology",
topologyToRemove)
);
}
+
final Set partitionsToReset = metadataForLocalThreads()
.stream()
.flatMap(t -> {
@@ -230,53 +236,76 @@ public class KafkaStreamsNamedTopologyWrapper extends
KafkaStreams {
topologyMetadata.unregisterTopology(removeTopologyFuture,
topologyToRemove);
-if (resetOffsets) {
+final boolean skipResetForUnstartedApplication =
+maybeCompleteFutureIfStillInCREATED(removeTopologyFuture,
"removing topology " + topologyToRemove);
+
+if (resetOffsets && !skipResetForUnstartedApplication) {