[GitHub] [kafka] mjsax commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-26 Thread GitBox


mjsax commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r477616241



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");

Review comment:
   Interesting... Thanks for clarification.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-26 Thread GitBox


mjsax commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r477606207



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));

Review comment:
   Ah. Good point. We can leave as-is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-21 Thread GitBox


mjsax commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r474877105



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {

Review comment:
   Nit: no need to line-break

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation

Review comment:
   The test seems to be self-explaining and thus we don't really need this 
comment

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
##
@@ -135,6 +136,20 @@ public void shouldThrowOnInvalidOffsetInWrite() throws 
IOException {
 }
 }
 
+@Test
+public void shouldThrowIOExceptionWhenWritingToNotExistedFile() {
+final Map offsetsToWrite = 
Collections.singletonMap(
+new TopicPartition(topic, 0), 0L);
+
+// create a file with not existed path, and feed into OffsetCheckpoint

Review comment:
   that the pass is "not existing" is already clear from 
`File("not_existing_dir/not_existing_file")` and thus not necessary

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));

Review comment:
   Subjective: Might be "better" to do the assertion after the try-block.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");

Review comment:
   Why do we need to add `.tmp` ?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public vo