cadonna commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r875647755
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) { return queryableStoreProvider.getStore(storeQueryParameters); } + /** + * This method pauses processing for the KafkaStreams instance. + * + * Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. + * Notably, paused topologies will still poll Kafka consumers, and commit offsets. + * This method sets transient state that is not maintained or managed among instances. + * Note that pause() can be called before start() in order to start a KafkaStreams instance + * in a manner where the processing is paused as described, but the consumers are started up. + */ + public void pause() { + if (topologyMetadata.hasNamedTopologies()) { + for (final NamedTopology allNamedTopology : topologyMetadata.getAllNamedTopologies()) { Review Comment: nit: `allNamedTopology` -> `namedTopology` since it is just one. ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) { return queryableStoreProvider.getStore(storeQueryParameters); } + /** + * This method pauses processing for the KafkaStreams instance. + * + * Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. + * Notably, paused topologies will still poll Kafka consumers, and commit offsets. + * This method sets transient state that is not maintained or managed among instances. + * Note that pause() can be called before start() in order to start a KafkaStreams instance + * in a manner where the processing is paused as described, but the consumers are started up. + */ + public void pause() { + if (topologyMetadata.hasNamedTopologies()) { + for (final NamedTopology allNamedTopology : topologyMetadata.getAllNamedTopologies()) { + topologyMetadata.pauseTopology(allNamedTopology.name()); + } + } else { + topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY); + } + } + + /** + * @return true when the KafkaStreams instance has its processing paused. + */ + public boolean isPaused() { + if (topologyMetadata.hasNamedTopologies()) { + return topologyMetadata.getAllNamedTopologies() + .stream().map(NamedTopology::name).allMatch(topologyMetadata::isPaused); + + } else { + return topologyMetadata.isPaused(UNNAMED_TOPOLOGY); + } + } + + /** + * This method resumes processing for the KafkaStreams instance. + */ + public void resume() { + if (topologyMetadata.hasNamedTopologies()) { + for (final NamedTopology allNamedTopology : topologyMetadata.getAllNamedTopologies()) { Review Comment: nit: same as above ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) { return queryableStoreProvider.getStore(storeQueryParameters); } + /** + * This method pauses processing for the KafkaStreams instance. + * + * Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. + * Notably, paused topologies will still poll Kafka consumers, and commit offsets. + * This method sets transient state that is not maintained or managed among instances. + * Note that pause() can be called before start() in order to start a KafkaStreams instance + * in a manner where the processing is paused as described, but the consumers are started up. + */ + public void pause() { + if (topologyMetadata.hasNamedTopologies()) { + for (final NamedTopology allNamedTopology : topologyMetadata.getAllNamedTopologies()) { + topologyMetadata.pauseTopology(allNamedTopology.name()); + } + } else { + topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY); + } + } + + /** + * @return true when the KafkaStreams instance has its processing paused. + */ + public boolean isPaused() { + if (topologyMetadata.hasNamedTopologies()) { + return topologyMetadata.getAllNamedTopologies() + .stream().map(NamedTopology::name).allMatch(topologyMetadata::isPaused); + Review Comment: Please remove empty line. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java: ########## @@ -35,21 +35,27 @@ public class TaskExecutionMetadata { private static final long CONSTANT_BACKOFF_MS = 5_000L; private final boolean hasNamedTopologies; + private final Set<String> pausedTopologies; // map of topologies experiencing errors/currently under backoff private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>(); - public TaskExecutionMetadata(final Set<String> allTopologyNames) { + public TaskExecutionMetadata(final Set<String> allTopologyNames, final Set<String> pausedTopologies) { this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY)); + this.pausedTopologies = pausedTopologies; } public boolean canProcessTask(final Task task, final long now) { Review Comment: I am aware that there are now unit tests for this class, but there are enough different code paths that would justify to add unit tests for this method. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ########## @@ -273,6 +274,12 @@ Collection<Task> allTasks() { return readOnlyTasks; } + Collection<Task> notPausedTasks() { Review Comment: Unit tests would be great! ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) { return queryableStoreProvider.getStore(storeQueryParameters); } + /** + * This method pauses processing for the KafkaStreams instance. + * + * Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. + * Notably, paused topologies will still poll Kafka consumers, and commit offsets. + * This method sets transient state that is not maintained or managed among instances. + * Note that pause() can be called before start() in order to start a KafkaStreams instance + * in a manner where the processing is paused as described, but the consumers are started up. + */ + public void pause() { + if (topologyMetadata.hasNamedTopologies()) { + for (final NamedTopology allNamedTopology : topologyMetadata.getAllNamedTopologies()) { + topologyMetadata.pauseTopology(allNamedTopology.name()); + } + } else { + topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY); + } + } + + /** + * @return true when the KafkaStreams instance has its processing paused. + */ + public boolean isPaused() { + if (topologyMetadata.hasNamedTopologies()) { + return topologyMetadata.getAllNamedTopologies() + .stream().map(NamedTopology::name).allMatch(topologyMetadata::isPaused); Review Comment: ```suggestion return topologyMetadata.getAllNamedTopologies().stream() .map(NamedTopology::name) .allMatch(topologyMetadata::isPaused); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -463,7 +463,10 @@ public void restore(final Map<TaskId, Task> tasks) { final TaskId taskId = changelogs.get(partition).stateManager.taskId(); try { if (restoreChangelog(changelogs.get(partition))) { Review Comment: Are you sure this method call avoids restoring state stores of paused tasks? Wouldn't `restoringChangelogs()` still return all changelog partitions that are in restoration and not just those that are not paused? Maybe it is possible to add some verifications to the unit tests to ensure that only non-paused tasks are restored. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -897,7 +897,8 @@ private void initializeAndRestorePhase() { } // we can always let changelog reader try restoring in order to initialize the changelogs; // if there's no active restoring or standby updating it would not try to fetch any data - changelogReader.restore(taskManager.tasks()); + // After KAFKA-13873, we only restore the not paused tasks. + changelogReader.restore(taskManager.notPausedTasks()); Review Comment: This should also be verified in a unit test with a mock changelog reader. ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) { return queryableStoreProvider.getStore(storeQueryParameters); } + /** + * This method pauses processing for the KafkaStreams instance. + * + * Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. + * Notably, paused topologies will still poll Kafka consumers, and commit offsets. + * This method sets transient state that is not maintained or managed among instances. + * Note that pause() can be called before start() in order to start a KafkaStreams instance + * in a manner where the processing is paused as described, but the consumers are started up. + */ + public void pause() { Review Comment: Could you please add unit tests for the new methods? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java: ########## @@ -257,6 +260,35 @@ public void registerAndBuildNewTopology(final KafkaFutureImpl<Void> future, fina } } + /** + * Pauses a topology by name + * @param topologyName Name of the topology to pause + */ + public void pauseTopology(final String topologyName) { Review Comment: These methods could also be unit tested really easily. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java: ########## @@ -275,7 +275,7 @@ private void commitSuccessfullyProcessedTasks() { int punctuate() { int punctuated = 0; - for (final Task task : tasks.activeTasks()) { + for (final Task task : tasks.notPausedTasks()) { Review Comment: Also here unit tests would be great and easily doable. ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) { return queryableStoreProvider.getStore(storeQueryParameters); } + /** + * This method pauses processing for the KafkaStreams instance. + * + * Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. + * Notably, paused topologies will still poll Kafka consumers, and commit offsets. + * This method sets transient state that is not maintained or managed among instances. + * Note that pause() can be called before start() in order to start a KafkaStreams instance + * in a manner where the processing is paused as described, but the consumers are started up. + */ + public void pause() { + if (topologyMetadata.hasNamedTopologies()) { + for (final NamedTopology allNamedTopology : topologyMetadata.getAllNamedTopologies()) { + topologyMetadata.pauseTopology(allNamedTopology.name()); + } + } else { + topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY); + } + } + + /** + * @return true when the KafkaStreams instance has its processing paused. + */ + public boolean isPaused() { + if (topologyMetadata.hasNamedTopologies()) { + return topologyMetadata.getAllNamedTopologies() + .stream().map(NamedTopology::name).allMatch(topologyMetadata::isPaused); Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org