This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b492296  MINOR: fix checkpoint write failure warning log (#6008)
b492296 is described below

commit b49229675748afae584799e11a8b8f090c4218a9
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Sat Dec 8 21:00:57 2018 -0600

    MINOR: fix checkpoint write failure warning log (#6008)
    
    We saw a log statement in which the cause of the failure to write a 
checkpoint was not properly logged.
    This change logs the exception properly and also verifies the log message.
    
    Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../processor/internals/ProcessorStateManager.java |  2 +-
 .../internals/ProcessorStateManagerTest.java       | 40 +++++++++++++++++
 .../internals/testutil/LogCaptureAppender.java     | 50 ++++++++++++++++++++++
 3 files changed, 91 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3d0c664..d08779f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -292,7 +292,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
         try {
             checkpoint.write(this.checkpointableOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write offset checkpoint file to {}: {}", 
checkpoint, e);
+            log.warn("Failed to write offset checkpoint file to [{}]", 
checkpoint, e);
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 3d91ee0..183f9ae 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockBatchingStateRestoreListener;
 import org.apache.kafka.test.MockKeyValueStore;
@@ -51,7 +52,9 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.singletonList;
+import static org.hamcrest.CoreMatchers.endsWith;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.startsWith;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
@@ -564,6 +567,43 @@ public class ProcessorStateManagerTest {
         }
     }
 
+    // if the optional is absent, it'll throw an exception and fail the test.
+    @SuppressWarnings("OptionalGetWithoutIsPresent")
+    @Test
+    public void shouldLogAWarningIfCheckpointThrowsAnIOException() {
+        final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
+
+        final ProcessorStateManager stateMgr;
+        try {
+            stateMgr = new ProcessorStateManager(
+                taskId,
+                noPartitions,
+                false,
+                stateDirectory,
+                Collections.singletonMap(persistentStore.name(), 
persistentStoreTopicName),
+                changelogReader,
+                false,
+                logContext);
+        } catch (final IOException e) {
+            e.printStackTrace();
+            throw new AssertionError(e);
+        }
+        stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
+
+        stateDirectory.clean();
+        stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 
10L));
+        LogCaptureAppender.unregister(appender);
+
+        final List<LogCaptureAppender.Event> messages = appender.getEvents();
+
+        final LogCaptureAppender.Event lastEvent = 
messages.get(messages.size() - 1);
+
+        assertThat(lastEvent.getLevel(), is("WARN"));
+        assertThat(lastEvent.getMessage(), 
startsWith("process-state-manager-test Failed to write offset checkpoint file 
to ["));
+        assertThat(lastEvent.getMessage(), endsWith(".checkpoint]"));
+        assertThat(lastEvent.getThrowableInfo().get(), 
startsWith("java.io.FileNotFoundException: "));
+    }
+
     @Test
     public void shouldFlushAllStoresEvenIfStoreThrowsException() throws 
IOException {
         final ProcessorStateManager stateManager = new ProcessorStateManager(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
index ffb8799..a1f7b31 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
@@ -24,10 +24,36 @@ import org.apache.log4j.spi.LoggingEvent;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 
 public class LogCaptureAppender extends AppenderSkeleton {
     private final LinkedList<LoggingEvent> events = new LinkedList<>();
 
+    @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+    public static class Event {
+        private String level;
+        private String message;
+        private Optional<String> throwableInfo;
+
+        Event(final String level, final String message, final Optional<String> 
throwableInfo) {
+            this.level = level;
+            this.message = message;
+            this.throwableInfo = throwableInfo;
+        }
+
+        public String getLevel() {
+            return level;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public Optional<String> getThrowableInfo() {
+            return throwableInfo;
+        }
+    }
+
     public static LogCaptureAppender createAndRegister() {
         final LogCaptureAppender logCaptureAppender = new LogCaptureAppender();
         Logger.getRootLogger().addAppender(logCaptureAppender);
@@ -59,6 +85,30 @@ public class LogCaptureAppender extends AppenderSkeleton {
         return result;
     }
 
+    public List<Event> getEvents() {
+        final LinkedList<Event> result = new LinkedList<>();
+        synchronized (events) {
+            for (final LoggingEvent event : events) {
+                final String[] throwableStrRep = event.getThrowableStrRep();
+                final Optional<String> throwableString;
+                if (throwableStrRep == null) {
+                    throwableString = Optional.empty();
+                } else {
+                    final StringBuilder throwableStringBuilder = new 
StringBuilder();
+
+                    for (final String s : throwableStrRep) {
+                        throwableStringBuilder.append(s);
+                    }
+
+                    throwableString = 
Optional.of(throwableStringBuilder.toString());
+                }
+
+                result.add(new Event(event.getLevel().toString(), 
event.getRenderedMessage(), throwableString));
+            }
+        }
+        return result;
+    }
+
     @Override
     public void close() {
 

Reply via email to