mjsax commented on code in PR #18752:
URL: https://github.com/apache/kafka/pull/18752#discussion_r1976509566


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java:
##########
@@ -252,6 +252,10 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() {
     @Test
     public void shouldNotCheckpointIfNotReceivedEnoughRecords() {
         globalStateTask.initialize();
+        // Reset after initialization since checkpointing should happen during 
initialization, KAFKA-18168

Review Comment:
   Or maybe even remove the commend entirely?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java:
##########
@@ -96,6 +96,7 @@ public Map<TopicPartition, Long> initialize() {
         }
         initTopology();
         processorContext.initialize();
+        this.flushState();

Review Comment:
   ```suggestion
          flushState();
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java:
##########
@@ -252,6 +252,10 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() {
     @Test
     public void shouldNotCheckpointIfNotReceivedEnoughRecords() {
         globalStateTask.initialize();
+        // Reset after initialization since checkpointing should happen during 
initialization, KAFKA-18168

Review Comment:
   ```suggestion
           // Reset after initialization since checkpointing should happen 
during initialization
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java:
##########
@@ -333,4 +343,21 @@ public void shouldWipeGlobalStateDirectory() throws 
Exception {
         globalStateTask.close(true);
         assertFalse(stateMgr.baseDir().exists());
     }
+
+    @Test
+    public void shouldCheckpointDuringInitialization() {
+        globalStateTask.initialize();
+
+        assertTrue(stateMgr.checkpointWritten);
+        assertTrue(stateMgr.flushed);
+    }
+
+    @Test
+    public void shouldCheckpointDuringClose() throws Exception {
+        globalStateTask.initialize();
+        globalStateTask.close(false);

Review Comment:
   Seems we need to reset both flags after `initialize()` but before `close()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java:
##########
@@ -138,6 +139,7 @@ public void flushState() {
     }
 
     public void close(final boolean wipeStateStore) throws IOException {
+        this.flushState();

Review Comment:
   ```suggestion
          flushState();
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java:
##########
@@ -138,6 +139,7 @@ public void flushState() {
     }
 
     public void close(final boolean wipeStateStore) throws IOException {
+        this.flushState();

Review Comment:
   It seems to not make sense to flush the state if `wipeStateStore == true` ? 
Not really a problem, as we are closing anyway, but it still feels like it's 
off.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java:
##########
@@ -252,6 +252,10 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() {
     @Test
     public void shouldNotCheckpointIfNotReceivedEnoughRecords() {
         globalStateTask.initialize();
+        // Reset after initialization since checkpointing should happen during 
initialization, KAFKA-18168
+        stateMgr.checkpointWritten = false;
+        stateMgr.flushed = false;

Review Comment:
   Nit: it might be slightly better, to do this a little later in the test, ie, 
just before we call `globalStateTask.maybeCheckpoint()`? Test logic might be 
easier to reads for humans as (1) reset flags, (2) checkpoint, (3) verify flags.
   
   ```
           globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L, 
"foo".getBytes(), "foo".getBytes()));
           time.sleep(flushInterval); // flush interval elapsed
           
           stateMgr.checkpointWritten = false;
           stateMgr.flushed = false;
           
           globalStateTask.maybeCheckpoint();
           
           assertEquals(offsets, stateMgr.changelogOffsets());
           assertFalse(stateMgr.flushed);
           assertFalse(stateMgr.checkpointWritten);
   ```
   
   Similar in other tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to