cadonna commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r880526257


##########
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##########
@@ -783,6 +822,28 @@ public void shouldThrowOnCleanupWhileRunning() throws 
InterruptedException {
         }
     }
 
+    @Test
+    public void shouldThrowOnCleanupWhilePaused() throws InterruptedException {
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            waitForCondition(
+                () -> streams.state() == KafkaStreams.State.RUNNING,
+                "Streams never started.");
+
+            streams.pause();
+            waitForCondition(
+                streams::isPaused,
+                "Streams did not paused");

Review Comment:
   typo: 
   ```suggestion
                   "Streams did not pause");
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -415,68 +415,111 @@ public Set<TopicPartition> completedChangelogs() {
     public void restore(final Map<TaskId, Task> tasks) {
         initializeChangelogs(tasks, registeredChangelogs());
 
-        if (!activeRestoringChangelogs().isEmpty() && state == 
ChangelogReaderState.STANDBY_UPDATING) {
-            throw new IllegalStateException("Should not be in standby updating 
state if there are still un-completed active changelogs");
+        if (!activeRestoringChangelogs().isEmpty()
+            && state == ChangelogReaderState.STANDBY_UPDATING) {

Review Comment:
   nit: Why this refactoring? The condition fits in one line.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -415,68 +415,111 @@ public Set<TopicPartition> completedChangelogs() {
     public void restore(final Map<TaskId, Task> tasks) {
         initializeChangelogs(tasks, registeredChangelogs());
 
-        if (!activeRestoringChangelogs().isEmpty() && state == 
ChangelogReaderState.STANDBY_UPDATING) {
-            throw new IllegalStateException("Should not be in standby updating 
state if there are still un-completed active changelogs");
+        if (!activeRestoringChangelogs().isEmpty()
+            && state == ChangelogReaderState.STANDBY_UPDATING) {
+            throw new IllegalStateException(
+                "Should not be in standby updating state if there are still 
un-completed active changelogs");
         }
 
         if (allChangelogsCompleted()) {
             log.debug("Finished restoring all changelogs {}", 
changelogs.keySet());
-            return;
-        }
+        } else {
+            final Set<TopicPartition> restoringChangelogs = 
restoringChangelogs();
+            if (!restoringChangelogs.isEmpty()) {
+                final ConsumerRecords<byte[], byte[]> polledRecords;
 
-        final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
-        if (!restoringChangelogs.isEmpty()) {
-            final ConsumerRecords<byte[], byte[]> polledRecords;
+                try {
+                    // for restoring active and updating standby we may prefer 
different poll time
+                    // in order to make sure we call the main consumer#poll in 
time.
+                    // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
+                    // JNH: Fix this?
+                    // Update state based on paused/resumed status.
+                    for (final TopicPartition partition : restoringChangelogs) 
{
+                        final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
+                        final Task task = tasks.get(taskId);
+                        if (task != null) {
+                            
restoreConsumer.resume(Collections.singleton(partition));
+                        } else {
+                            
restoreConsumer.pause(Collections.singleton(partition));
+                        }
+                    }
 
-            try {
-                // for restoring active and updating standby we may prefer 
different poll time
-                // in order to make sure we call the main consumer#poll in 
time.
-                // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state == 
ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
-
-                // TODO (?) If we cannot fetch records during restore, should 
we trigger `task.timeout.ms` ?
-                // TODO (?) If we cannot fetch records for standby task, 
should we trigger `task.timeout.ms` ?
-            } catch (final InvalidOffsetException e) {
-                log.warn("Encountered " + e.getClass().getName() +
-                    " fetching records from restore consumer for partitions " 
+ e.partitions() + ", it is likely that " +
-                    "the consumer's position has fallen out of the topic 
partition offset range because the topic was " +
-                    "truncated or compacted on the broker, marking the 
corresponding tasks as corrupted and re-initializing" +
-                    " it later.", e);
-
-                final Set<TaskId> corruptedTasks = new HashSet<>();
-                e.partitions().forEach(partition -> 
corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
-                throw new TaskCorruptedException(corruptedTasks, e);
-            } catch (final KafkaException e) {
-                throw new StreamsException("Restore consumer get unexpected 
error polling records.", e);
-            }
+                    polledRecords = restoreConsumer.poll(
+                        state == ChangelogReaderState.STANDBY_UPDATING ? 
Duration.ZERO : pollTime);
+
+                    // TODO (?) If we cannot fetch records during restore, 
should we trigger `task.timeout.ms` ?
+                    // TODO (?) If we cannot fetch records for standby task, 
should we trigger `task.timeout.ms` ?
+                } catch (final InvalidOffsetException e) {
+                    log.warn("Encountered " + e.getClass().getName() +
+                        " fetching records from restore consumer for 
partitions " + e.partitions()
+                        + ", it is likely that " +
+                        "the consumer's position has fallen out of the topic 
partition offset range because the topic was "
+                        +
+                        "truncated or compacted on the broker, marking the 
corresponding tasks as corrupted and re-initializing"
+                        +
+                        " it later.", e);
+
+                    final Set<TaskId> corruptedTasks = new HashSet<>();
+                    e.partitions().forEach(partition -> corruptedTasks.add(
+                        changelogs.get(partition).stateManager.taskId()));
+                    throw new TaskCorruptedException(corruptedTasks, e);
+                } catch (final KafkaException e) {
+                    throw new StreamsException(
+                        "Restore consumer get unexpected error polling 
records.", e);
+                }
 
-            for (final TopicPartition partition : polledRecords.partitions()) {
-                
bufferChangelogRecords(restoringChangelogByPartition(partition), 
polledRecords.records(partition));
-            }
+                // JNH: Fix this?

Review Comment:
   Please do not forget to remove those comments here and elsewhere.



##########
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##########
@@ -783,6 +822,28 @@ public void shouldThrowOnCleanupWhileRunning() throws 
InterruptedException {
         }
     }
 
+    @Test
+    public void shouldThrowOnCleanupWhilePaused() throws InterruptedException {
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            waitForCondition(
+                () -> streams.state() == KafkaStreams.State.RUNNING,
+                "Streams never started.");
+
+            streams.pause();
+            waitForCondition(
+                streams::isPaused,
+                "Streams did not paused");
+
+            try {
+                streams.cleanUp();
+                fail("Should have thrown IllegalStateException");
+            } catch (final IllegalStateException expected) {
+                assertEquals("Cannot clean up while running.", 
expected.getMessage());
+            }

Review Comment:
   Wouldn't it be simpler to use `assertThrows()`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1761,6 +1761,96 @@ public void shouldUpdateStandbyTask() throws Exception {
         thread.taskManager().shutdown(true);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotUpdateStandbyTaskWhenPaused() throws Exception {
+        final String storeName1 = "count-one";
+        final String storeName2 = "table-two";
+        final String changelogName1 = APPLICATION_ID + "-" + storeName1 + 
"-changelog";
+        final String changelogName2 = APPLICATION_ID + "-" + storeName2 + 
"-changelog";
+        final TopicPartition partition1 = new TopicPartition(changelogName1, 
1);
+        final TopicPartition partition2 = new TopicPartition(changelogName2, 
1);
+        internalStreamsBuilder
+            .stream(Collections.singleton(topic1), consumed)
+            .groupByKey()
+            .count(Materialized.as(storeName1));
+        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized
+            = new MaterializedInternal<>(Materialized.as(storeName2), 
internalStreamsBuilder, "");
+        internalStreamsBuilder.table(topic2, new ConsumedInternal<>(), 
materialized);
+
+        internalStreamsBuilder.buildAndOptimizeTopology();
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
+        restoreConsumer.updatePartitions(changelogName1,
+            Collections.singletonList(new PartitionInfo(changelogName1, 1, 
null, new Node[0], new Node[0]))
+        );
+
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 
10L));
+        
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 
0L));
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 
10L));
+        
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 
0L));
+        final OffsetCheckpoint checkpoint
+            = new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(task3), CHECKPOINT_FILE_NAME));
+        checkpoint.write(Collections.singletonMap(partition2, 5L));
+
+        thread.setState(StreamThread.State.STARTING);
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        // assign single partition
+        standbyTasks.put(task1, Collections.singleton(t1p1));
+        standbyTasks.put(task3, Collections.singleton(t2p1));
+
+        thread.taskManager().handleAssignment(emptyMap(), standbyTasks);
+        thread.taskManager().tryToCompleteRestoration(mockTime.milliseconds(), 
null);
+
+        
thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
+
+        thread.runOnce();
+
+        final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), 
t1p1);
+        final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), 
t2p1);
+        assertEquals(task1, standbyTask1.id());
+        assertEquals(task3, standbyTask2.id());
+
+        final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, 
Long>) standbyTask1.getStore(storeName1);
+        final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, 
Long>) standbyTask2.getStore(storeName2);
+        assertEquals(0L, store1.approximateNumEntries());
+        assertEquals(0L, store2.approximateNumEntries());
+
+        // let the store1 be restored from 0 to 10; store2 be restored from 5 
(checkpointed) to 10
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                changelogName1,
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                changelogName2,
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+        }
+
+        // Simulate pause
+        
thread.taskManager().topologyMetadata().pauseTopology(TopologyMetadata.UNNAMED_TOPOLOGY);
+        thread.runOnce();
+        assertEquals(0L, store1.approximateNumEntries());
+        assertEquals(0L, store2.approximateNumEntries());
+
+        // Simulate resume
+        
thread.taskManager().topologyMetadata().resumeTopology(TopologyMetadata.UNNAMED_TOPOLOGY);
+        thread.runOnce();
+
+        assertEquals(10L, store1.approximateNumEntries());
+        assertEquals(4L, store2.approximateNumEntries());

Review Comment:
   Why 4 and not 5?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -415,68 +415,111 @@ public Set<TopicPartition> completedChangelogs() {
     public void restore(final Map<TaskId, Task> tasks) {
         initializeChangelogs(tasks, registeredChangelogs());
 
-        if (!activeRestoringChangelogs().isEmpty() && state == 
ChangelogReaderState.STANDBY_UPDATING) {
-            throw new IllegalStateException("Should not be in standby updating 
state if there are still un-completed active changelogs");
+        if (!activeRestoringChangelogs().isEmpty()
+            && state == ChangelogReaderState.STANDBY_UPDATING) {
+            throw new IllegalStateException(
+                "Should not be in standby updating state if there are still 
un-completed active changelogs");
         }
 
         if (allChangelogsCompleted()) {
             log.debug("Finished restoring all changelogs {}", 
changelogs.keySet());
-            return;
-        }
+        } else {
+            final Set<TopicPartition> restoringChangelogs = 
restoringChangelogs();
+            if (!restoringChangelogs.isEmpty()) {
+                final ConsumerRecords<byte[], byte[]> polledRecords;
 
-        final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
-        if (!restoringChangelogs.isEmpty()) {
-            final ConsumerRecords<byte[], byte[]> polledRecords;
+                try {
+                    // for restoring active and updating standby we may prefer 
different poll time
+                    // in order to make sure we call the main consumer#poll in 
time.
+                    // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern

Review Comment:
   I think these comments should be moved before the call to 
`restoreConsumer.poll()`. BTW, this again confirms that inline comments are a 
poor way to document code, most of the times.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TaskExecutionMetadataTest {
+    final static String TOPOLOGY1 = "topology1";
+    final static String TOPOLOGY2 = "topology2";
+    final static Set<String> NAMED_TOPOLOGIES = new 
HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2));
+
+    @Test
+    public void testCanProcessWithoutNamedTopologies() {
+        final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY);
+        final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
+
+        final TaskExecutionMetadata metadata = new 
TaskExecutionMetadata(topologies,
+            pausedTopologies);
+
+        final Task mockTask = createMockTask(UNNAMED_TOPOLOGY);
+
+        Assert.assertTrue(metadata.canProcessTask(mockTask, 0));
+        // This pauses an UNNAMED_TOPOLOGY / a KafkaStreams instance without 
named/modular
+        // topologies.
+        pausedTopologies.add(UNNAMED_TOPOLOGY);
+        Assert.assertFalse(metadata.canProcessTask(mockTask, 0));
+    }
+
+    @Test
+    public void testNamedTopologiesCanBePausedIndependently() {
+        final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
+        final TaskExecutionMetadata metadata = new 
TaskExecutionMetadata(NAMED_TOPOLOGIES,
+            pausedTopologies);
+
+        final Task mockTask1 = createMockTask(TOPOLOGY1);
+        final Task mockTask2 = createMockTask(TOPOLOGY2);
+
+        Assert.assertTrue(metadata.canProcessTask(mockTask1, 0));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, 0));
+
+        pausedTopologies.add(TOPOLOGY1);
+        Assert.assertFalse(metadata.canProcessTask(mockTask1, 0));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, 0));
+
+        pausedTopologies.remove(TOPOLOGY1);
+        Assert.assertTrue(metadata.canProcessTask(mockTask1, 0));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, 0));
+    }
+
+    @Test
+    public void testNamedTopologiesCanBeStartedPaused() {
+        final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
+        pausedTopologies.add(TOPOLOGY1);
+
+        final TaskExecutionMetadata metadata = new 
TaskExecutionMetadata(NAMED_TOPOLOGIES,
+            pausedTopologies);
+
+        final Task mockTask1 = createMockTask(TOPOLOGY1);
+        final Task mockTask2 = createMockTask(TOPOLOGY2);
+
+        Assert.assertFalse(metadata.canProcessTask(mockTask1, 0));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, 0));
+
+        pausedTopologies.remove(TOPOLOGY1);
+        Assert.assertTrue(metadata.canProcessTask(mockTask1, 0));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, 0));
+    }
+
+    @Test
+    public void testNamedTopologiesCanBackoff() {
+        final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
+
+        final TaskExecutionMetadata metadata = new 
TaskExecutionMetadata(NAMED_TOPOLOGIES,
+            pausedTopologies);
+
+        final Task mockTask1 = createMockTask(TOPOLOGY1);
+        final Task mockTask2 = createMockTask(TOPOLOGY2);
+
+        Assert.assertTrue(metadata.canProcessTask(mockTask1, 0));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, 0));
+
+        metadata.registerTaskError(mockTask1, new Throwable("Error"), 0);
+        // One second after the error, task1 cannot process, task2 can.
+        Assert.assertFalse(metadata.canProcessTask(mockTask1, 1000));
+        Assert.assertTrue(metadata.canProcessTask(mockTask2, 1000));
+

Review Comment:
   Could you add a case where the time since the error is exactly the backoff 
time (i.e. 5000)?
   I would prefer to put the times 1000, 5000, and 10000 into variables with 
meaningful names instead of adding comments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to