mjsax commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r474877105
##
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws
IOException {
assertEquals(expected, offsets);
}
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws
IOException {
+final Map offsets = Collections.singletonMap(t1,
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(),
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
Review comment:
Nit: no need to line-break
##
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws
IOException {
assertEquals(expected, offsets);
}
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws
IOException {
+final Map offsets = Collections.singletonMap(t1,
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(),
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the
IOException situation
Review comment:
The test seems to be self-explaining and thus we don't really need this
comment
##
File path:
streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
##
@@ -135,6 +136,20 @@ public void shouldThrowOnInvalidOffsetInWrite() throws
IOException {
}
}
+@Test
+public void shouldThrowIOExceptionWhenWritingToNotExistedFile() {
+final Map offsetsToWrite =
Collections.singletonMap(
+new TopicPartition(topic, 0), 0L);
+
+// create a file with not existed path, and feed into OffsetCheckpoint
Review comment:
that the pass is "not existing" is already clear from
`File("not_existing_dir/not_existing_file")` and thus not necessary
##
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws
IOException {
assertEquals(expected, offsets);
}
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws
IOException {
+final Map offsets = Collections.singletonMap(t1,
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(),
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " +
checkpointFile.getPath() + " for global stores")));
Review comment:
Subjective: Might be "better" to do the assertion after the try-block.
##
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws
IOException {
assertEquals(expected, offsets);
}
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws
IOException {
+final Map offsets = Collections.singletonMap(t1,
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(),
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
Review comment:
Why do we need to add `.tmp` ?
##
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws
IOException {
assertEquals(expected, offsets);
}
+@Test
+public vo