mjsax commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r474877105
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ########## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } + @Test + public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { + final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L); + stateManager.initialize(); + stateManager.updateChangelogOffsets(offsets); + + final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); + file.createNewFile(); + // set the checkpoint tmp file to read-only to simulate the IOException situation + file.setWritable(false); + + try (final LogCaptureAppender appender = + LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) { Review comment: Nit: no need to line-break ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ########## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } + @Test + public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { + final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L); + stateManager.initialize(); + stateManager.updateChangelogOffsets(offsets); + + final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); + file.createNewFile(); + // set the checkpoint tmp file to read-only to simulate the IOException situation Review comment: The test seems to be self-explaining and thus we don't really need this comment ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java ########## @@ -135,6 +136,20 @@ public void shouldThrowOnInvalidOffsetInWrite() throws IOException { } } + @Test + public void shouldThrowIOExceptionWhenWritingToNotExistedFile() { + final Map<TopicPartition, Long> offsetsToWrite = Collections.singletonMap( + new TopicPartition(topic, 0), 0L); + + // create a file with not existed path, and feed into OffsetCheckpoint Review comment: that the pass is "not existing" is already clear from `File("not_existing_dir/not_existing_file")` and thus not necessary ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ########## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } + @Test + public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { + final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L); + stateManager.initialize(); + stateManager.updateChangelogOffsets(offsets); + + final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); + file.createNewFile(); + // set the checkpoint tmp file to read-only to simulate the IOException situation + file.setWritable(false); + + try (final LogCaptureAppender appender = + LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) { + + // checkpoint should fail due to the file is readonly + stateManager.checkpoint(); + assertThat(appender.getMessages(), hasItem(containsString( + "Failed to write offset checkpoint file to " + checkpointFile.getPath() + " for global stores"))); Review comment: Subjective: Might be "better" to do the assertion after the try-block. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ########## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } + @Test + public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { + final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L); + stateManager.initialize(); + stateManager.updateChangelogOffsets(offsets); + + final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); Review comment: Why do we need to add `.tmp` ? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ########## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } + @Test + public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { + final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L); + stateManager.initialize(); + stateManager.updateChangelogOffsets(offsets); + + final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); + file.createNewFile(); + // set the checkpoint tmp file to read-only to simulate the IOException situation + file.setWritable(false); + + try (final LogCaptureAppender appender = + LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) { + + // checkpoint should fail due to the file is readonly Review comment: as above; comment is not really needed ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java ########## @@ -135,6 +136,20 @@ public void shouldThrowOnInvalidOffsetInWrite() throws IOException { } } + @Test + public void shouldThrowIOExceptionWhenWritingToNotExistedFile() { + final Map<TopicPartition, Long> offsetsToWrite = Collections.singletonMap( + new TopicPartition(topic, 0), 0L); Review comment: nit: no need to line break ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org