mjsax commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r467291346
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ########## @@ -93,9 +90,7 @@ public boolean isActive() { public void initializeIfNeeded() { if (state() == State.CREATED) { StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext); - - // initialize the snapshot with the current offsets as we don't need to commit then until they change - offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets()); + initializeCheckpoint(); Review comment: In the old code, we actually get a copy of the `Map`, while within `initializeCheckpoint();` don't -- is this on purpose? It it safe? Also, do we actually need the method? The old code was just doing the exact some thing? It's just one-liner method -- what do we gain? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ########## @@ -49,6 +59,30 @@ this.stateDirectory = stateDirectory; } + protected void initializeCheckpoint() { + // we will delete the local checkpoint file after registering the state stores and loading them into the + // state manager, therefore we should initialize the snapshot as empty to indicate over-write checkpoint needed Review comment: Seems the comment is outdated? `we should initialize the snapshot as empty` ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -1638,21 +1689,22 @@ public void shouldCheckpointOffsetsOnPostCommitIfCommitNeeded() { task.suspend(); task.prepareCommit(); - task.postCommit(); + task.postCommit(false); assertEquals(Task.State.SUSPENDED, task.state()); EasyMock.verify(stateManager); } @Test - public void shouldSwallowExceptionOnCloseCleanError() { + public void shouldThrowExceptionOnCloseCleanError() { final long offset = 543L; EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(partition1, offset))); - EasyMock.expectLastCall(); + stateManager.checkpoint(); + EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); Review comment: as above? (more below... won't add comments each time) ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ########## @@ -412,7 +412,7 @@ public void shouldInitializeOffsetsFromCheckpointFile() throws IOException { stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); stateMgr.initializeStoreOffsetsFromCheckpoint(true); - assertFalse(checkpointFile.exists()); + assertTrue(checkpointFile.exists()); Review comment: Should we add a test for EOS, that the checkpoint file is deleted? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -1259,13 +1262,47 @@ public void shouldReInitializeTopologyWhenResuming() throws IOException { } @Test - public void shouldCheckpointOffsetsOnCommit() { + public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() { final Long offset = 543L; EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, offset))); + stateManager.checkpoint(); + EasyMock.expectLastCall().once(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)); + EasyMock.expect(stateManager.changelogOffsets()) + .andReturn(Collections.singletonMap(changelogPartition, 0L)) + .andReturn(Collections.singletonMap(changelogPartition, 10L)) + .andReturn(Collections.singletonMap(changelogPartition, 20L)); + stateManager.registerStore(stateStore, stateStore.stateRestoreCallback); EasyMock.expectLastCall(); + EasyMock.replay(stateManager, recordCollector); + + task = createStatefulTask(createConfig(false, "100"), true); + + task.initializeIfNeeded(); + task.completeRestoration(); + + task.prepareCommit(); + task.postCommit(true); + + task.prepareCommit(); + task.postCommit(false); + + EasyMock.verify(recordCollector); Review comment: Should we verify `stateManager`, too? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -1560,42 +1641,13 @@ public void shouldCheckpointWithCreatedStateOnClose() { } @Test - public void shouldNotCommitAndThrowOnCloseDirty() { - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); - stateManager.checkpoint(EasyMock.anyObject()); - EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); - EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); - EasyMock.replay(stateManager, recordCollector); - - final MetricName metricName = setupCloseTaskMetric(); - - task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); - - task.initializeIfNeeded(); - task.completeRestoration(); - - task.suspend(); - task.closeDirty(); - - assertEquals(Task.State.CLOSED, task.state()); - assertTrue(source1.initialized); - assertTrue(source1.closed); - - final double expectedCloseTaskMetric = 1.0; - verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - - EasyMock.verify(stateManager); - } - - @Test - public void shouldCheckpointOnCloseRestoring() { + public void shouldNotCheckpointOnCloseRestoringIfNoProgress() { stateManager.flush(); - EasyMock.expectLastCall(); - stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes(); + stateManager.checkpoint(); + EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); Review comment: Similar to above: instead of throwing, it should be sufficient to just not register any expected calll? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -1458,50 +1498,91 @@ public void shouldThrowIfPostCommittingOnIllegalState() { task.transitionTo(Task.State.SUSPENDED); task.transitionTo(Task.State.CLOSED); - assertThrows(IllegalStateException.class, task::postCommit); + assertThrows(IllegalStateException.class, () -> task.postCommit(true)); } @Test public void shouldSkipCheckpointingSuspendedCreatedTask() { - stateManager.checkpoint(EasyMock.anyObject()); + stateManager.checkpoint(); EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint")); EasyMock.replay(stateManager); task = createStatefulTask(createConfig(false, "100"), true); task.suspend(); - task.postCommit(); + task.postCommit(true); } @Test - public void shouldCheckpointWithEmptyOffsetsForSuspendedRestoringTask() { - stateManager.checkpoint(emptyMap()); + public void shouldCheckpointForSuspendedTask() { + stateManager.checkpoint(); + EasyMock.expectLastCall().once(); + EasyMock.expect(stateManager.changelogOffsets()) + .andReturn(Collections.singletonMap(partition1, 0L)) + .andReturn(Collections.singletonMap(partition1, 1L)); EasyMock.replay(stateManager); task = createStatefulTask(createConfig(false, "100"), true); task.initializeIfNeeded(); task.suspend(); - task.postCommit(); + task.postCommit(true); EasyMock.verify(stateManager); } @Test - public void shouldCheckpointWithEmptyOffsetsForSuspendedRunningTaskWithNoCommitNeeded() { - stateManager.checkpoint(emptyMap()); + public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() { + EasyMock.expect(stateManager.changelogOffsets()) + .andReturn(Collections.singletonMap(partition1, 1L)) + .andReturn(Collections.singletonMap(partition1, 2L)) + .andReturn(Collections.singletonMap(partition1, 3L)); + stateManager.checkpoint(); + EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); Review comment: Why do we need to setup an exception? If we don't setup any expected call at all, it should fail automatically? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ########## @@ -207,15 +207,25 @@ public void shouldFlushAndCheckpointStateManagerOnCommit() { EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.flush(); EasyMock.expectLastCall(); - stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L)); + stateManager.checkpoint(); + EasyMock.expectLastCall(); + EasyMock.expect(stateManager.changelogOffsets()) + .andReturn(Collections.singletonMap(partition, 50L)) + .andReturn(Collections.singletonMap(partition, 11000L)) + .andReturn(Collections.singletonMap(partition, 11000L)); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); EasyMock.replay(stateManager); task = createStandbyTask(); task.initializeIfNeeded(); task.prepareCommit(); - task.postCommit(); + task.postCommit(false); // this should not checkpoint Review comment: It's unclear to me, how we actually verify that the checkpointing happened? Above, we have ``` stateManager.checkpoint(); EasyMock.expectLastCall(); ``` but it only help to verify that we checkpoint a single time, but not which of the three calls does the checkpointing? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org