cadonna commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r880526257
########## streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java: ########## @@ -783,6 +822,28 @@ public void shouldThrowOnCleanupWhileRunning() throws InterruptedException { } } + @Test + public void shouldThrowOnCleanupWhilePaused() throws InterruptedException { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + streams.start(); + waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); + + streams.pause(); + waitForCondition( + streams::isPaused, + "Streams did not paused"); Review Comment: typo: ```suggestion "Streams did not pause"); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -415,68 +415,111 @@ public Set<TopicPartition> completedChangelogs() { public void restore(final Map<TaskId, Task> tasks) { initializeChangelogs(tasks, registeredChangelogs()); - if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) { - throw new IllegalStateException("Should not be in standby updating state if there are still un-completed active changelogs"); + if (!activeRestoringChangelogs().isEmpty() + && state == ChangelogReaderState.STANDBY_UPDATING) { Review Comment: nit: Why this refactoring? The condition fits in one line. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -415,68 +415,111 @@ public Set<TopicPartition> completedChangelogs() { public void restore(final Map<TaskId, Task> tasks) { initializeChangelogs(tasks, registeredChangelogs()); - if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) { - throw new IllegalStateException("Should not be in standby updating state if there are still un-completed active changelogs"); + if (!activeRestoringChangelogs().isEmpty() + && state == ChangelogReaderState.STANDBY_UPDATING) { + throw new IllegalStateException( + "Should not be in standby updating state if there are still un-completed active changelogs"); } if (allChangelogsCompleted()) { log.debug("Finished restoring all changelogs {}", changelogs.keySet()); - return; - } + } else { + final Set<TopicPartition> restoringChangelogs = restoringChangelogs(); + if (!restoringChangelogs.isEmpty()) { + final ConsumerRecords<byte[], byte[]> polledRecords; - final Set<TopicPartition> restoringChangelogs = restoringChangelogs(); - if (!restoringChangelogs.isEmpty()) { - final ConsumerRecords<byte[], byte[]> polledRecords; + try { + // for restoring active and updating standby we may prefer different poll time + // in order to make sure we call the main consumer#poll in time. + // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern + // JNH: Fix this? + // Update state based on paused/resumed status. + for (final TopicPartition partition : restoringChangelogs) { + final TaskId taskId = changelogs.get(partition).stateManager.taskId(); + final Task task = tasks.get(taskId); + if (task != null) { + restoreConsumer.resume(Collections.singleton(partition)); + } else { + restoreConsumer.pause(Collections.singleton(partition)); + } + } - try { - // for restoring active and updating standby we may prefer different poll time - // in order to make sure we call the main consumer#poll in time. - // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern - polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime); - - // TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ? - // TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ? - } catch (final InvalidOffsetException e) { - log.warn("Encountered " + e.getClass().getName() + - " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " + - "the consumer's position has fallen out of the topic partition offset range because the topic was " + - "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + - " it later.", e); - - final Set<TaskId> corruptedTasks = new HashSet<>(); - e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId())); - throw new TaskCorruptedException(corruptedTasks, e); - } catch (final KafkaException e) { - throw new StreamsException("Restore consumer get unexpected error polling records.", e); - } + polledRecords = restoreConsumer.poll( + state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime); + + // TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ? + // TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ? + } catch (final InvalidOffsetException e) { + log.warn("Encountered " + e.getClass().getName() + + " fetching records from restore consumer for partitions " + e.partitions() + + ", it is likely that " + + "the consumer's position has fallen out of the topic partition offset range because the topic was " + + + "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + + + " it later.", e); + + final Set<TaskId> corruptedTasks = new HashSet<>(); + e.partitions().forEach(partition -> corruptedTasks.add( + changelogs.get(partition).stateManager.taskId())); + throw new TaskCorruptedException(corruptedTasks, e); + } catch (final KafkaException e) { + throw new StreamsException( + "Restore consumer get unexpected error polling records.", e); + } - for (final TopicPartition partition : polledRecords.partitions()) { - bufferChangelogRecords(restoringChangelogByPartition(partition), polledRecords.records(partition)); - } + // JNH: Fix this? Review Comment: Please do not forget to remove those comments here and elsewhere. ########## streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java: ########## @@ -783,6 +822,28 @@ public void shouldThrowOnCleanupWhileRunning() throws InterruptedException { } } + @Test + public void shouldThrowOnCleanupWhilePaused() throws InterruptedException { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + streams.start(); + waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); + + streams.pause(); + waitForCondition( + streams::isPaused, + "Streams did not paused"); + + try { + streams.cleanUp(); + fail("Should have thrown IllegalStateException"); + } catch (final IllegalStateException expected) { + assertEquals("Cannot clean up while running.", expected.getMessage()); + } Review Comment: Wouldn't it be simpler to use `assertThrows()`? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1761,6 +1761,96 @@ public void shouldUpdateStandbyTask() throws Exception { thread.taskManager().shutdown(true); } + @SuppressWarnings("unchecked") + @Test + public void shouldNotUpdateStandbyTaskWhenPaused() throws Exception { + final String storeName1 = "count-one"; + final String storeName2 = "table-two"; + final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; + final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; + final TopicPartition partition1 = new TopicPartition(changelogName1, 1); + final TopicPartition partition2 = new TopicPartition(changelogName2, 1); + internalStreamsBuilder + .stream(Collections.singleton(topic1), consumed) + .groupByKey() + .count(Materialized.as(storeName1)); + final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized + = new MaterializedInternal<>(Materialized.as(storeName2), internalStreamsBuilder, ""); + internalStreamsBuilder.table(topic2, new ConsumedInternal<>(), materialized); + + internalStreamsBuilder.buildAndOptimizeTopology(); + final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; + restoreConsumer.updatePartitions(changelogName1, + Collections.singletonList(new PartitionInfo(changelogName1, 1, null, new Node[0], new Node[0])) + ); + + restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L)); + restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L)); + restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L)); + restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L)); + final OffsetCheckpoint checkpoint + = new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(task3), CHECKPOINT_FILE_NAME)); + checkpoint.write(Collections.singletonMap(partition2, 5L)); + + thread.setState(StreamThread.State.STARTING); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); + + final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); + + // assign single partition + standbyTasks.put(task1, Collections.singleton(t1p1)); + standbyTasks.put(task3, Collections.singleton(t2p1)); + + thread.taskManager().handleAssignment(emptyMap(), standbyTasks); + thread.taskManager().tryToCompleteRestoration(mockTime.milliseconds(), null); + + thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); + + thread.runOnce(); + + final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1); + final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1); + assertEquals(task1, standbyTask1.id()); + assertEquals(task3, standbyTask2.id()); + + final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1); + final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2); + assertEquals(0L, store1.approximateNumEntries()); + assertEquals(0L, store2.approximateNumEntries()); + + // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10 + for (long i = 0L; i < 10L; i++) { + restoreConsumer.addRecord(new ConsumerRecord<>( + changelogName1, + 1, + i, + ("K" + i).getBytes(), + ("V" + i).getBytes())); + restoreConsumer.addRecord(new ConsumerRecord<>( + changelogName2, + 1, + i, + ("K" + i).getBytes(), + ("V" + i).getBytes())); + } + + // Simulate pause + thread.taskManager().topologyMetadata().pauseTopology(TopologyMetadata.UNNAMED_TOPOLOGY); + thread.runOnce(); + assertEquals(0L, store1.approximateNumEntries()); + assertEquals(0L, store2.approximateNumEntries()); + + // Simulate resume + thread.taskManager().topologyMetadata().resumeTopology(TopologyMetadata.UNNAMED_TOPOLOGY); + thread.runOnce(); + + assertEquals(10L, store1.approximateNumEntries()); + assertEquals(4L, store2.approximateNumEntries()); Review Comment: Why 4 and not 5? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -415,68 +415,111 @@ public Set<TopicPartition> completedChangelogs() { public void restore(final Map<TaskId, Task> tasks) { initializeChangelogs(tasks, registeredChangelogs()); - if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) { - throw new IllegalStateException("Should not be in standby updating state if there are still un-completed active changelogs"); + if (!activeRestoringChangelogs().isEmpty() + && state == ChangelogReaderState.STANDBY_UPDATING) { + throw new IllegalStateException( + "Should not be in standby updating state if there are still un-completed active changelogs"); } if (allChangelogsCompleted()) { log.debug("Finished restoring all changelogs {}", changelogs.keySet()); - return; - } + } else { + final Set<TopicPartition> restoringChangelogs = restoringChangelogs(); + if (!restoringChangelogs.isEmpty()) { + final ConsumerRecords<byte[], byte[]> polledRecords; - final Set<TopicPartition> restoringChangelogs = restoringChangelogs(); - if (!restoringChangelogs.isEmpty()) { - final ConsumerRecords<byte[], byte[]> polledRecords; + try { + // for restoring active and updating standby we may prefer different poll time + // in order to make sure we call the main consumer#poll in time. + // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern Review Comment: I think these comments should be moved before the call to `restoreConsumer.poll()`. BTW, this again confirms that inline comments are a poor way to document code, most of the times. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.Assert; +import org.junit.Test; + +public class TaskExecutionMetadataTest { + final static String TOPOLOGY1 = "topology1"; + final static String TOPOLOGY2 = "topology2"; + final static Set<String> NAMED_TOPOLOGIES = new HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2)); + + @Test + public void testCanProcessWithoutNamedTopologies() { + final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY); + final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet(); + + final TaskExecutionMetadata metadata = new TaskExecutionMetadata(topologies, + pausedTopologies); + + final Task mockTask = createMockTask(UNNAMED_TOPOLOGY); + + Assert.assertTrue(metadata.canProcessTask(mockTask, 0)); + // This pauses an UNNAMED_TOPOLOGY / a KafkaStreams instance without named/modular + // topologies. + pausedTopologies.add(UNNAMED_TOPOLOGY); + Assert.assertFalse(metadata.canProcessTask(mockTask, 0)); + } + + @Test + public void testNamedTopologiesCanBePausedIndependently() { + final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet(); + final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES, + pausedTopologies); + + final Task mockTask1 = createMockTask(TOPOLOGY1); + final Task mockTask2 = createMockTask(TOPOLOGY2); + + Assert.assertTrue(metadata.canProcessTask(mockTask1, 0)); + Assert.assertTrue(metadata.canProcessTask(mockTask2, 0)); + + pausedTopologies.add(TOPOLOGY1); + Assert.assertFalse(metadata.canProcessTask(mockTask1, 0)); + Assert.assertTrue(metadata.canProcessTask(mockTask2, 0)); + + pausedTopologies.remove(TOPOLOGY1); + Assert.assertTrue(metadata.canProcessTask(mockTask1, 0)); + Assert.assertTrue(metadata.canProcessTask(mockTask2, 0)); + } + + @Test + public void testNamedTopologiesCanBeStartedPaused() { + final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet(); + pausedTopologies.add(TOPOLOGY1); + + final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES, + pausedTopologies); + + final Task mockTask1 = createMockTask(TOPOLOGY1); + final Task mockTask2 = createMockTask(TOPOLOGY2); + + Assert.assertFalse(metadata.canProcessTask(mockTask1, 0)); + Assert.assertTrue(metadata.canProcessTask(mockTask2, 0)); + + pausedTopologies.remove(TOPOLOGY1); + Assert.assertTrue(metadata.canProcessTask(mockTask1, 0)); + Assert.assertTrue(metadata.canProcessTask(mockTask2, 0)); + } + + @Test + public void testNamedTopologiesCanBackoff() { + final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet(); + + final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES, + pausedTopologies); + + final Task mockTask1 = createMockTask(TOPOLOGY1); + final Task mockTask2 = createMockTask(TOPOLOGY2); + + Assert.assertTrue(metadata.canProcessTask(mockTask1, 0)); + Assert.assertTrue(metadata.canProcessTask(mockTask2, 0)); + + metadata.registerTaskError(mockTask1, new Throwable("Error"), 0); + // One second after the error, task1 cannot process, task2 can. + Assert.assertFalse(metadata.canProcessTask(mockTask1, 1000)); + Assert.assertTrue(metadata.canProcessTask(mockTask2, 1000)); + Review Comment: Could you add a case where the time since the error is exactly the backoff time (i.e. 5000)? I would prefer to put the times 1000, 5000, and 10000 into variables with meaningful names instead of adding comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org