mjsax commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r467291346



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -93,9 +90,7 @@ public boolean isActive() {
     public void initializeIfNeeded() {
         if (state() == State.CREATED) {
             StateManagerUtil.registerStateStores(log, logPrefix, topology, 
stateMgr, stateDirectory, processorContext);
-
-            // initialize the snapshot with the current offsets as we don't 
need to commit then until they change
-            offsetSnapshotSinceLastCommit = new 
HashMap<>(stateMgr.changelogOffsets());
+            initializeCheckpoint();

Review comment:
       In the old code, we actually get a copy of the `Map`, while within 
`initializeCheckpoint();` don't -- is this on purpose? It it safe?
   
   Also, do we actually need the method? The old code was just doing the exact 
some thing? It's just one-liner method -- what do we gain?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -49,6 +59,30 @@
         this.stateDirectory = stateDirectory;
     }
 
+    protected void initializeCheckpoint() {
+        // we will delete the local checkpoint file after registering the 
state stores and loading them into the
+        // state manager, therefore we should initialize the snapshot as empty 
to indicate over-write checkpoint needed

Review comment:
       Seems the comment is outdated? `we should initialize the snapshot as 
empty`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1638,21 +1689,22 @@ public void 
shouldCheckpointOffsetsOnPostCommitIfCommitNeeded() {
 
         task.suspend();
         task.prepareCommit();
-        task.postCommit();
+        task.postCommit(false);
 
         assertEquals(Task.State.SUSPENDED, task.state());
 
         EasyMock.verify(stateManager);
     }
 
     @Test
-    public void shouldSwallowExceptionOnCloseCleanError() {
+    public void shouldThrowExceptionOnCloseCleanError() {
         final long offset = 543L;
 
         
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(partition1, 
offset)));
-        EasyMock.expectLastCall();
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint 
should not be called")).anyTimes();

Review comment:
       as above? (more below... won't add comments each time)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##########
@@ -412,7 +412,7 @@ public void shouldInitializeOffsetsFromCheckpointFile() 
throws IOException {
             stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
             stateMgr.initializeStoreOffsetsFromCheckpoint(true);
 
-            assertFalse(checkpointFile.exists());
+            assertTrue(checkpointFile.exists());

Review comment:
       Should we add a test for EOS, that the checkpoint file is deleted?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1259,13 +1262,47 @@ public void shouldReInitializeTopologyWhenResuming() 
throws IOException {
     }
 
     @Test
-    public void shouldCheckpointOffsetsOnCommit() {
+    public void 
shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
         final Long offset = 543L;
 
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
 offset)).anyTimes();
-        
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition,
 offset)));
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once();
+        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition));
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(Collections.singletonMap(changelogPartition, 0L))
+                .andReturn(Collections.singletonMap(changelogPartition, 10L))
+                .andReturn(Collections.singletonMap(changelogPartition, 20L));
+        stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback);
         EasyMock.expectLastCall();
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(false, "100"), true);
+
+        task.initializeIfNeeded();
+        task.completeRestoration();
+
+        task.prepareCommit();
+        task.postCommit(true);
+
+        task.prepareCommit();
+        task.postCommit(false);
+
+        EasyMock.verify(recordCollector);

Review comment:
       Should we verify `stateManager`, too?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1560,42 +1641,13 @@ public void shouldCheckpointWithCreatedStateOnClose() {
     }
 
     @Test
-    public void shouldNotCommitAndThrowOnCloseDirty() {
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
ProcessorStateException("KABOOM!")).anyTimes();
-        stateManager.checkpoint(EasyMock.anyObject());
-        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint 
should not be called")).anyTimes();
-        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
-        final MetricName metricName = setupCloseTaskMetric();
-
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
-
-        task.initializeIfNeeded();
-        task.completeRestoration();
-
-        task.suspend();
-        task.closeDirty();
-
-        assertEquals(Task.State.CLOSED, task.state());
-        assertTrue(source1.initialized);
-        assertTrue(source1.closed);
-
-        final double expectedCloseTaskMetric = 1.0;
-        verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-        EasyMock.verify(stateManager);
-    }
-
-    @Test
-    public void shouldCheckpointOnCloseRestoring() {
+    public void shouldNotCheckpointOnCloseRestoringIfNoProgress() {
         stateManager.flush();
-        EasyMock.expectLastCall();
-        stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should 
not be called")).anyTimes();
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint 
should not be called")).anyTimes();

Review comment:
       Similar to above: instead of throwing, it should be sufficient to just 
not register any expected calll?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1458,50 +1498,91 @@ public void shouldThrowIfPostCommittingOnIllegalState() 
{
 
         task.transitionTo(Task.State.SUSPENDED);
         task.transitionTo(Task.State.CLOSED);
-        assertThrows(IllegalStateException.class, task::postCommit);
+        assertThrows(IllegalStateException.class, () -> task.postCommit(true));
     }
 
     @Test
     public void shouldSkipCheckpointingSuspendedCreatedTask() {
-        stateManager.checkpoint(EasyMock.anyObject());
+        stateManager.checkpoint();
         EasyMock.expectLastCall().andThrow(new AssertionError("Should not have 
tried to checkpoint"));
         EasyMock.replay(stateManager);
 
         task = createStatefulTask(createConfig(false, "100"), true);
         task.suspend();
-        task.postCommit();
+        task.postCommit(true);
     }
 
     @Test
-    public void shouldCheckpointWithEmptyOffsetsForSuspendedRestoringTask() {
-        stateManager.checkpoint(emptyMap());
+    public void shouldCheckpointForSuspendedTask() {
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once();
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(Collections.singletonMap(partition1, 0L))
+                .andReturn(Collections.singletonMap(partition1, 1L));
         EasyMock.replay(stateManager);
 
         task = createStatefulTask(createConfig(false, "100"), true);
         task.initializeIfNeeded();
         task.suspend();
-        task.postCommit();
+        task.postCommit(true);
         EasyMock.verify(stateManager);
     }
 
     @Test
-    public void 
shouldCheckpointWithEmptyOffsetsForSuspendedRunningTaskWithNoCommitNeeded() {
-        stateManager.checkpoint(emptyMap());
+    public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(Collections.singletonMap(partition1, 1L))
+                .andReturn(Collections.singletonMap(partition1, 2L))
+                .andReturn(Collections.singletonMap(partition1, 3L));
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint 
should not be called")).anyTimes();

Review comment:
       Why do we need to setup an exception? If we don't setup any expected 
call at all, it should fail automatically?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
##########
@@ -207,15 +207,25 @@ public void 
shouldFlushAndCheckpointStateManagerOnCommit() {
         
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.flush();
         EasyMock.expectLastCall();
-        stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
-        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition,
 50L));
+        stateManager.checkpoint();
+        EasyMock.expectLastCall();
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(Collections.singletonMap(partition, 50L))
+                .andReturn(Collections.singletonMap(partition, 11000L))
+                .andReturn(Collections.singletonMap(partition, 11000L));
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
         EasyMock.replay(stateManager);
 
         task = createStandbyTask();
         task.initializeIfNeeded();
         task.prepareCommit();
-        task.postCommit();
+        task.postCommit(false);  // this should not checkpoint

Review comment:
       It's unclear to me, how we actually verify that the checkpointing 
happened? Above, we have
   ```
   stateManager.checkpoint();
   EasyMock.expectLastCall();
   ```
   
   but it only help to verify that we checkpoint a single time, but not which 
of the three calls does the checkpointing?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to