mjsax commented on code in PR #18752: URL: https://github.com/apache/kafka/pull/18752#discussion_r1976509566
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java: ########## @@ -252,6 +252,10 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() { @Test public void shouldNotCheckpointIfNotReceivedEnoughRecords() { globalStateTask.initialize(); + // Reset after initialization since checkpointing should happen during initialization, KAFKA-18168 Review Comment: Or maybe even remove the commend entirely? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java: ########## @@ -96,6 +96,7 @@ public Map<TopicPartition, Long> initialize() { } initTopology(); processorContext.initialize(); + this.flushState(); Review Comment: ```suggestion flushState(); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java: ########## @@ -252,6 +252,10 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() { @Test public void shouldNotCheckpointIfNotReceivedEnoughRecords() { globalStateTask.initialize(); + // Reset after initialization since checkpointing should happen during initialization, KAFKA-18168 Review Comment: ```suggestion // Reset after initialization since checkpointing should happen during initialization ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java: ########## @@ -333,4 +343,21 @@ public void shouldWipeGlobalStateDirectory() throws Exception { globalStateTask.close(true); assertFalse(stateMgr.baseDir().exists()); } + + @Test + public void shouldCheckpointDuringInitialization() { + globalStateTask.initialize(); + + assertTrue(stateMgr.checkpointWritten); + assertTrue(stateMgr.flushed); + } + + @Test + public void shouldCheckpointDuringClose() throws Exception { + globalStateTask.initialize(); + globalStateTask.close(false); Review Comment: Seems we need to reset both flags after `initialize()` but before `close()`? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java: ########## @@ -138,6 +139,7 @@ public void flushState() { } public void close(final boolean wipeStateStore) throws IOException { + this.flushState(); Review Comment: ```suggestion flushState(); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java: ########## @@ -138,6 +139,7 @@ public void flushState() { } public void close(final boolean wipeStateStore) throws IOException { + this.flushState(); Review Comment: It seems to not make sense to flush the state if `wipeStateStore == true` ? Not really a problem, as we are closing anyway, but it still feels like it's off. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java: ########## @@ -252,6 +252,10 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() { @Test public void shouldNotCheckpointIfNotReceivedEnoughRecords() { globalStateTask.initialize(); + // Reset after initialization since checkpointing should happen during initialization, KAFKA-18168 + stateMgr.checkpointWritten = false; + stateMgr.flushed = false; Review Comment: Nit: it might be slightly better, to do this a little later in the test, ie, just before we call `globalStateTask.maybeCheckpoint()`? Test logic might be easier to reads for humans as (1) reset flags, (2) checkpoint, (3) verify flags. ``` globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L, "foo".getBytes(), "foo".getBytes())); time.sleep(flushInterval); // flush interval elapsed stateMgr.checkpointWritten = false; stateMgr.flushed = false; globalStateTask.maybeCheckpoint(); assertEquals(offsets, stateMgr.changelogOffsets()); assertFalse(stateMgr.flushed); assertFalse(stateMgr.checkpointWritten); ``` Similar in other tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org