This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3faa6cf6d0 MINOR: Use mock time in DefaultStateUpdaterTest (#12344) 3faa6cf6d0 is described below commit 3faa6cf6d060887288fcf68adb8c3f1e2090b8ed Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Wed Jun 29 12:33:00 2022 -0700 MINOR: Use mock time in DefaultStateUpdaterTest (#12344) For most tests we would need an auto-ticking mock timer to work with draining-with-timeout functions. For tests that check for never checkpoint we need no auto-ticking timer to control exactly how much time elapsed. Reviewers: Bruno Cadonna <cado...@apache.org> --- .../processor/internals/DefaultStateUpdater.java | 5 ++-- .../internals/DefaultStateUpdaterTest.java | 34 ++++++++++++---------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 0e84574c5c..886a37b314 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -356,8 +356,6 @@ public class DefaultStateUpdater implements StateUpdater { this.offsetResetter = offsetResetter; this.time = time; this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); - // initialize the last commit as of now to prevent first commit happens immediately - this.lastCommitMs = time.milliseconds(); } public void start() { @@ -365,6 +363,9 @@ public class DefaultStateUpdater implements StateUpdater { stateUpdaterThread = new StateUpdaterThread("state-updater", changelogReader, offsetResetter); stateUpdaterThread.start(); shutdownGate = new CountDownLatch(1); + + // initialize the last commit as of now to prevent first commit happens immediately + this.lastCommitMs = time.milliseconds(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 8bd81828f6..5e2d90de71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; @@ -44,7 +45,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.apache.kafka.common.utils.Utils.sleep; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.easymock.EasyMock.anyBoolean; @@ -79,24 +79,25 @@ class DefaultStateUpdaterTest { private final static TaskId TASK_1_0 = new TaskId(1, 0); private final static TaskId TASK_1_1 = new TaskId(1, 1); - private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); + // need an auto-tick timer to work for draining with timeout + private final Time time = new MockTime(1L); + private final StreamsConfig config = new StreamsConfig(configProps()); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { }; - - private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); + private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, time); @AfterEach public void tearDown() { stateUpdater.shutdown(Duration.ofMinutes(1)); } - private Properties configProps(final int commitInterval) { + private Properties configProps() { return mkObjectProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2), - mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval), - mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), commitInterval) + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL), + mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), COMMIT_INTERVAL) )); } @@ -437,16 +438,16 @@ class DefaultStateUpdaterTest { @Test public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception { final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - shouldNotRemoveTaskFromRestoredActiveTasks(task, Collections.singleton(TOPIC_PARTITION_A_0)); + shouldNotRemoveTaskFromRestoredActiveTasks(task); } @Test public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception { final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0); - shouldNotRemoveTaskFromRestoredActiveTasks(task, Collections.emptySet()); + shouldNotRemoveTaskFromRestoredActiveTasks(task); } - private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task, final Set<TopicPartition> completedChangelogs) throws Exception { + private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception { final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); @@ -701,8 +702,10 @@ class DefaultStateUpdaterTest { stateUpdater.add(task2); stateUpdater.add(task3); stateUpdater.add(task4); + // wait for all tasks added to the thread before advance timer + verifyUpdatingTasks(task1, task2, task3, task4); - sleep(COMMIT_INTERVAL); + time.sleep(COMMIT_INTERVAL + 1); verifyExceptionsAndFailedTasks(); verifyCheckpointTasks(false, task1, task2, task3, task4); @@ -710,10 +713,9 @@ class DefaultStateUpdaterTest { @Test public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() { - stateUpdater.shutdown(Duration.ofMinutes(1)); - final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE)); - stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); - + // we need to use a non auto-ticking timer here to control how much time elapsed exactly + final Time time = new MockTime(); + final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, time); try { final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); @@ -727,6 +729,8 @@ class DefaultStateUpdaterTest { stateUpdater.add(task3); stateUpdater.add(task4); + time.sleep(COMMIT_INTERVAL); + verifyNeverCheckpointTasks(task1, task2, task3, task4); } finally { stateUpdater.shutdown(Duration.ofMinutes(1));