cadonna commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r875647755


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> 
storeQueryParameters) {
         return queryableStoreProvider.getStore(storeQueryParameters);
     }
 
+    /**
+     *  This method pauses processing for the KafkaStreams instance.
+     *
+     *  Paused topologies will only skip over a) processing, b) punctuation, 
and c) standby tasks.
+     *  Notably, paused topologies will still poll Kafka consumers, and commit 
offsets.
+     *  This method sets transient state that is not maintained or managed 
among instances.
+     *  Note that pause() can be called before start() in order to start a 
KafkaStreams instance
+     *  in a manner where the processing is paused as described, but the 
consumers are started up.
+     */
+    public void pause() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final NamedTopology allNamedTopology : 
topologyMetadata.getAllNamedTopologies()) {

Review Comment:
   nit: `allNamedTopology` -> `namedTopology` since it is just one. 



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> 
storeQueryParameters) {
         return queryableStoreProvider.getStore(storeQueryParameters);
     }
 
+    /**
+     *  This method pauses processing for the KafkaStreams instance.
+     *
+     *  Paused topologies will only skip over a) processing, b) punctuation, 
and c) standby tasks.
+     *  Notably, paused topologies will still poll Kafka consumers, and commit 
offsets.
+     *  This method sets transient state that is not maintained or managed 
among instances.
+     *  Note that pause() can be called before start() in order to start a 
KafkaStreams instance
+     *  in a manner where the processing is paused as described, but the 
consumers are started up.
+     */
+    public void pause() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final NamedTopology allNamedTopology : 
topologyMetadata.getAllNamedTopologies()) {
+                topologyMetadata.pauseTopology(allNamedTopology.name());
+            }
+        } else {
+            topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
+        }
+    }
+
+    /**
+     * @return true when the KafkaStreams instance has its processing paused.
+     */
+    public boolean isPaused() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            return topologyMetadata.getAllNamedTopologies()
+                
.stream().map(NamedTopology::name).allMatch(topologyMetadata::isPaused);
+
+        } else {
+            return topologyMetadata.isPaused(UNNAMED_TOPOLOGY);
+        }
+    }
+
+    /**
+     * This method resumes processing for the KafkaStreams instance.
+     */
+    public void resume() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final NamedTopology allNamedTopology : 
topologyMetadata.getAllNamedTopologies()) {

Review Comment:
   nit: same as above



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> 
storeQueryParameters) {
         return queryableStoreProvider.getStore(storeQueryParameters);
     }
 
+    /**
+     *  This method pauses processing for the KafkaStreams instance.
+     *
+     *  Paused topologies will only skip over a) processing, b) punctuation, 
and c) standby tasks.
+     *  Notably, paused topologies will still poll Kafka consumers, and commit 
offsets.
+     *  This method sets transient state that is not maintained or managed 
among instances.
+     *  Note that pause() can be called before start() in order to start a 
KafkaStreams instance
+     *  in a manner where the processing is paused as described, but the 
consumers are started up.
+     */
+    public void pause() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final NamedTopology allNamedTopology : 
topologyMetadata.getAllNamedTopologies()) {
+                topologyMetadata.pauseTopology(allNamedTopology.name());
+            }
+        } else {
+            topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
+        }
+    }
+
+    /**
+     * @return true when the KafkaStreams instance has its processing paused.
+     */
+    public boolean isPaused() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            return topologyMetadata.getAllNamedTopologies()
+                
.stream().map(NamedTopology::name).allMatch(topologyMetadata::isPaused);
+

Review Comment:
   Please remove empty line.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##########
@@ -35,21 +35,27 @@ public class TaskExecutionMetadata {
     private static final long CONSTANT_BACKOFF_MS = 5_000L;
 
     private final boolean hasNamedTopologies;
+    private final Set<String> pausedTopologies;
     // map of topologies experiencing errors/currently under backoff
     private final ConcurrentHashMap<String, NamedTopologyMetadata> 
topologyNameToErrorMetadata = new ConcurrentHashMap<>();
 
-    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+    public TaskExecutionMetadata(final Set<String> allTopologyNames, final 
Set<String> pausedTopologies) {
         this.hasNamedTopologies = !(allTopologyNames.size() == 1 && 
allTopologyNames.contains(UNNAMED_TOPOLOGY));
+        this.pausedTopologies = pausedTopologies;
     }
 
     public boolean canProcessTask(final Task task, final long now) {

Review Comment:
   I am aware that there are now unit tests for this class, but there are 
enough different code paths that would justify to add unit tests for this 
method. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -273,6 +274,12 @@ Collection<Task> allTasks() {
         return readOnlyTasks;
     }
 
+    Collection<Task> notPausedTasks() {

Review Comment:
   Unit tests would be great!



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> 
storeQueryParameters) {
         return queryableStoreProvider.getStore(storeQueryParameters);
     }
 
+    /**
+     *  This method pauses processing for the KafkaStreams instance.
+     *
+     *  Paused topologies will only skip over a) processing, b) punctuation, 
and c) standby tasks.
+     *  Notably, paused topologies will still poll Kafka consumers, and commit 
offsets.
+     *  This method sets transient state that is not maintained or managed 
among instances.
+     *  Note that pause() can be called before start() in order to start a 
KafkaStreams instance
+     *  in a manner where the processing is paused as described, but the 
consumers are started up.
+     */
+    public void pause() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final NamedTopology allNamedTopology : 
topologyMetadata.getAllNamedTopologies()) {
+                topologyMetadata.pauseTopology(allNamedTopology.name());
+            }
+        } else {
+            topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
+        }
+    }
+
+    /**
+     * @return true when the KafkaStreams instance has its processing paused.
+     */
+    public boolean isPaused() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            return topologyMetadata.getAllNamedTopologies()
+                
.stream().map(NamedTopology::name).allMatch(topologyMetadata::isPaused);

Review Comment:
   ```suggestion
               return topologyMetadata.getAllNamedTopologies().stream()
                   .map(NamedTopology::name)
                   .allMatch(topologyMetadata::isPaused);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -463,7 +463,10 @@ public void restore(final Map<TaskId, Task> tasks) {
                 final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
                 try {
                     if (restoreChangelog(changelogs.get(partition))) {

Review Comment:
   Are you sure this method call avoids restoring state stores of paused tasks? 
Wouldn't `restoringChangelogs()` still return all changelog partitions that are 
in restoration and not just those that are not paused?
   Maybe it is possible to add some verifications to the unit tests to ensure 
that only non-paused tasks are restored.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -897,7 +897,8 @@ private void initializeAndRestorePhase() {
         }
         // we can always let changelog reader try restoring in order to 
initialize the changelogs;
         // if there's no active restoring or standby updating it would not try 
to fetch any data
-        changelogReader.restore(taskManager.tasks());
+        // After KAFKA-13873, we only restore the not paused tasks.
+        changelogReader.restore(taskManager.notPausedTasks());

Review Comment:
   This should also be verified in a unit test with a mock changelog reader.



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> 
storeQueryParameters) {
         return queryableStoreProvider.getStore(storeQueryParameters);
     }
 
+    /**
+     *  This method pauses processing for the KafkaStreams instance.
+     *
+     *  Paused topologies will only skip over a) processing, b) punctuation, 
and c) standby tasks.
+     *  Notably, paused topologies will still poll Kafka consumers, and commit 
offsets.
+     *  This method sets transient state that is not maintained or managed 
among instances.
+     *  Note that pause() can be called before start() in order to start a 
KafkaStreams instance
+     *  in a manner where the processing is paused as described, but the 
consumers are started up.
+     */
+    public void pause() {

Review Comment:
   Could you please add unit tests for the new methods? 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java:
##########
@@ -257,6 +260,35 @@ public void registerAndBuildNewTopology(final 
KafkaFutureImpl<Void> future, fina
         }
     }
 
+    /**
+     * Pauses a topology by name
+     * @param topologyName Name of the topology to pause
+     */
+    public void pauseTopology(final String topologyName) {

Review Comment:
   These methods could also be unit tested really easily.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -275,7 +275,7 @@ private void commitSuccessfullyProcessedTasks() {
     int punctuate() {
         int punctuated = 0;
 
-        for (final Task task : tasks.activeTasks()) {
+        for (final Task task : tasks.notPausedTasks()) {

Review Comment:
   Also here unit tests would be great and easily doable. 



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> 
storeQueryParameters) {
         return queryableStoreProvider.getStore(storeQueryParameters);
     }
 
+    /**
+     *  This method pauses processing for the KafkaStreams instance.
+     *
+     *  Paused topologies will only skip over a) processing, b) punctuation, 
and c) standby tasks.
+     *  Notably, paused topologies will still poll Kafka consumers, and commit 
offsets.
+     *  This method sets transient state that is not maintained or managed 
among instances.
+     *  Note that pause() can be called before start() in order to start a 
KafkaStreams instance
+     *  in a manner where the processing is paused as described, but the 
consumers are started up.
+     */
+    public void pause() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final NamedTopology allNamedTopology : 
topologyMetadata.getAllNamedTopologies()) {
+                topologyMetadata.pauseTopology(allNamedTopology.name());
+            }
+        } else {
+            topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
+        }
+    }
+
+    /**
+     * @return true when the KafkaStreams instance has its processing paused.
+     */
+    public boolean isPaused() {
+        if (topologyMetadata.hasNamedTopologies()) {
+            return topologyMetadata.getAllNamedTopologies()
+                
.stream().map(NamedTopology::name).allMatch(topologyMetadata::isPaused);

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to