This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b492296 MINOR: fix checkpoint write failure warning log (#6008) b492296 is described below commit b49229675748afae584799e11a8b8f090c4218a9 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Sat Dec 8 21:00:57 2018 -0600 MINOR: fix checkpoint write failure warning log (#6008) We saw a log statement in which the cause of the failure to write a checkpoint was not properly logged. This change logs the exception properly and also verifies the log message. Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../processor/internals/ProcessorStateManager.java | 2 +- .../internals/ProcessorStateManagerTest.java | 40 +++++++++++++++++ .../internals/testutil/LogCaptureAppender.java | 50 ++++++++++++++++++++++ 3 files changed, 91 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3d0c664..d08779f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -292,7 +292,7 @@ public class ProcessorStateManager extends AbstractStateManager { try { checkpoint.write(this.checkpointableOffsets); } catch (final IOException e) { - log.warn("Failed to write offset checkpoint file to {}: {}", checkpoint, e); + log.warn("Failed to write offset checkpoint file to [{}]", checkpoint, e); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 3d91ee0..183f9ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockBatchingStateRestoreListener; import org.apache.kafka.test.MockKeyValueStore; @@ -51,7 +52,9 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.singletonList; +import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; @@ -564,6 +567,43 @@ public class ProcessorStateManagerTest { } } + // if the optional is absent, it'll throw an exception and fail the test. + @SuppressWarnings("OptionalGetWithoutIsPresent") + @Test + public void shouldLogAWarningIfCheckpointThrowsAnIOException() { + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final ProcessorStateManager stateMgr; + try { + stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + Collections.singletonMap(persistentStore.name(), persistentStoreTopicName), + changelogReader, + false, + logContext); + } catch (final IOException e) { + e.printStackTrace(); + throw new AssertionError(e); + } + stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); + + stateDirectory.clean(); + stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L)); + LogCaptureAppender.unregister(appender); + + final List<LogCaptureAppender.Event> messages = appender.getEvents(); + + final LogCaptureAppender.Event lastEvent = messages.get(messages.size() - 1); + + assertThat(lastEvent.getLevel(), is("WARN")); + assertThat(lastEvent.getMessage(), startsWith("process-state-manager-test Failed to write offset checkpoint file to [")); + assertThat(lastEvent.getMessage(), endsWith(".checkpoint]")); + assertThat(lastEvent.getThrowableInfo().get(), startsWith("java.io.FileNotFoundException: ")); + } + @Test public void shouldFlushAllStoresEvenIfStoreThrowsException() throws IOException { final ProcessorStateManager stateManager = new ProcessorStateManager( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java index ffb8799..a1f7b31 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java @@ -24,10 +24,36 @@ import org.apache.log4j.spi.LoggingEvent; import java.util.LinkedList; import java.util.List; +import java.util.Optional; public class LogCaptureAppender extends AppenderSkeleton { private final LinkedList<LoggingEvent> events = new LinkedList<>(); + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public static class Event { + private String level; + private String message; + private Optional<String> throwableInfo; + + Event(final String level, final String message, final Optional<String> throwableInfo) { + this.level = level; + this.message = message; + this.throwableInfo = throwableInfo; + } + + public String getLevel() { + return level; + } + + public String getMessage() { + return message; + } + + public Optional<String> getThrowableInfo() { + return throwableInfo; + } + } + public static LogCaptureAppender createAndRegister() { final LogCaptureAppender logCaptureAppender = new LogCaptureAppender(); Logger.getRootLogger().addAppender(logCaptureAppender); @@ -59,6 +85,30 @@ public class LogCaptureAppender extends AppenderSkeleton { return result; } + public List<Event> getEvents() { + final LinkedList<Event> result = new LinkedList<>(); + synchronized (events) { + for (final LoggingEvent event : events) { + final String[] throwableStrRep = event.getThrowableStrRep(); + final Optional<String> throwableString; + if (throwableStrRep == null) { + throwableString = Optional.empty(); + } else { + final StringBuilder throwableStringBuilder = new StringBuilder(); + + for (final String s : throwableStrRep) { + throwableStringBuilder.append(s); + } + + throwableString = Optional.of(throwableStringBuilder.toString()); + } + + result.add(new Event(event.getLevel().toString(), event.getRenderedMessage(), throwableString)); + } + } + return result; + } + @Override public void close() {