This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push: new 0f2f7e6 MINOR: improve logging of tasks on shutdown (#7597) 0f2f7e6 is described below commit 0f2f7e689429cbc2c4474149386d2a2d37264716 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Tue Oct 29 10:15:39 2019 -0700 MINOR: improve logging of tasks on shutdown (#7597) Reviewers: Guozhang Wang <guozh...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../streams/processor/internals/AbstractTask.java | 5 +- .../processor/internals/AssignedStandbyTasks.java | 10 ++++ .../processor/internals/AssignedStreamsTasks.java | 62 +++++++++++++--------- .../streams/processor/internals/AssignedTasks.java | 2 +- .../processor/internals/StoreChangelogReader.java | 2 + .../streams/processor/internals/TaskManager.java | 7 +-- .../internals/AssignedStreamsTasksTest.java | 2 +- .../processor/internals/StreamTaskTest.java | 11 ++-- .../processor/internals/StreamThreadTest.java | 18 ++++--- .../processor/internals/TaskManagerTest.java | 4 +- 10 files changed, 75 insertions(+), 48 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index d8494fa..ede47ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -72,7 +72,8 @@ public abstract class AbstractTask implements Task { this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); this.stateDirectory = stateDirectory; - this.logPrefix = String.format("%s [%s] ", isStandby ? "standby-task" : "task", id); + final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); + this.logPrefix = threadIdPrefix + String.format("%s [%s] ", isStandby ? "standby-task" : "task", id); this.logContext = new LogContext(logPrefix); this.log = logContext.logger(getClass()); @@ -197,7 +198,7 @@ public abstract class AbstractTask implements Task { log.trace("Initializing state stores"); for (final StateStore store : topology.stateStores()) { - log.trace("Initializing store {}", store.name()); + log.debug("Initializing store {}", store.name()); processorContext.uninitialize(); store.init(processorContext, store); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java index 9783970..0c9a70d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java @@ -31,6 +31,16 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> { } @Override + public void shutdown(final boolean clean) { + final String shutdownType = clean ? "Clean" : "Unclean"; + log.debug(shutdownType + " shutdown of all standby tasks" + "\n" + + "created tasks to close: {}" + "\n" + + "running tasks to close: {}", + clean, created.keySet(), running.keySet()); + super.shutdown(clean); + } + + @Override int commit() { final int committed = super.commit(); // TODO: this contortion would not be necessary if we got rid of the two-step diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index cba17d0..3515824 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -111,7 +111,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin } else if (restoring.containsKey(task)) { revokedRestoringTasks.add(task); } else if (!suspended.containsKey(task)) { - log.warn("Task {} was revoked but cannot be found in the assignment, may have been closed due to error", task); + log.warn("Stream task {} was revoked but cannot be found in the assignment, may have been closed due to error", task); } } @@ -126,7 +126,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin final List<TopicPartition> taskChangelogs) { final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); - log.debug("Suspending running {} {}", taskTypeName, running.keySet()); + log.debug("Suspending the running stream tasks {}", running.keySet()); for (final TaskId id : runningTasksToSuspend) { final StreamTask task = running.get(id); @@ -136,20 +136,20 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin suspended.put(id, task); } catch (final TaskMigratedException closeAsZombieAndSwallow) { // swallow and move on since we are rebalancing - log.info("Failed to suspend {} {} since it got migrated to another thread already. " + - "Closing it as zombie and move on.", taskTypeName, id); + log.info("Failed to suspend the stream task {} since it got migrated to another thread already. " + + "Closing it as zombie and moving on.", id); firstException.compareAndSet(null, closeZombieTask(task)); prevActiveTasks.remove(id); } catch (final RuntimeException e) { - log.error("Suspending {} {} failed due to the following error:", taskTypeName, id, e); + log.error("Suspending the stream task {} failed due to the following error:", id, e); firstException.compareAndSet(null, e); try { prevActiveTasks.remove(id); task.close(false, false); } catch (final RuntimeException f) { log.error( - "After suspending failed, closing the same {} {} failed again due to the following error:", - taskTypeName, id, f); + "After suspending failed, closing the same stream task {} failed again due to the following error:", + id, f); } } finally { running.remove(id); @@ -159,14 +159,14 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin } } - log.trace("Successfully suspended the running {} {}", taskTypeName, suspended.keySet()); + log.trace("Successfully suspended the running stream task {}", suspended.keySet()); return firstException.get(); } private RuntimeException closeNonRunningTasks(final Set<TaskId> nonRunningTasksToClose, final List<TopicPartition> closedTaskChangelogs) { - log.debug("Closing the created but not initialized {} {}", taskTypeName, nonRunningTasksToClose); + log.debug("Closing the created but not initialized stream tasks {}", nonRunningTasksToClose); final AtomicReference<RuntimeException> firstException = new AtomicReference<>(); for (final TaskId id : nonRunningTasksToClose) { @@ -202,7 +202,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin final boolean clean = !isZombie; task.close(clean, isZombie); } catch (final RuntimeException e) { - log.error("Failed to close {}, {}", taskTypeName, task.id(), e); + log.error("Failed to close the stream task {}", task.id(), e); return e; } @@ -218,7 +218,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin try { task.close(false, isZombie); } catch (final RuntimeException e) { - log.error("Failed to close {}, {}", taskTypeName, task.id(), e); + log.error("Failed to close the stream task {}", task.id(), e); return e; } @@ -239,7 +239,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin final boolean clean = !isZombie; task.closeStateManager(clean); } catch (final RuntimeException e) { - log.error("Failed to close restoring task {} due to the following error:", task.id(), e); + log.error("Failed to close the restoring stream task {} due to the following error:", task.id(), e); return e; } @@ -254,7 +254,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin final boolean clean = !isZombie; task.closeSuspended(clean, null); } catch (final RuntimeException e) { - log.error("Failed to close suspended {} {} due to the following error:", taskTypeName, task.id(), e); + log.error("Failed to close the suspended stream task {} due to the following error:", task.id(), e); return e; } @@ -262,7 +262,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin } RuntimeException closeNotAssignedSuspendedTasks(final Set<TaskId> revokedTasks) { - log.debug("Closing the revoked active tasks {} {}", taskTypeName, revokedTasks); + log.debug("Closing the revoked active stream tasks {}", revokedTasks); final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); for (final TaskId revokedTask : revokedTasks) { @@ -271,7 +271,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin if (suspendedTask != null) { firstException.compareAndSet(null, closeSuspended(false, suspendedTask)); } else { - log.debug("Revoked task {} could not be found in suspended, may have already been closed", revokedTask); + log.debug("Revoked stream task {} could not be found in suspended, may have already been closed", revokedTask); } } return firstException.get(); @@ -301,7 +301,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin // With the current rebalance protocol, there should not be any running tasks left as they were all lost if (!prevActiveTasks.isEmpty()) { - log.error("Found still running {} after closing all tasks lost as zombies", taskTypeName); + log.error("Found the still running stream tasks {} after closing all tasks lost as zombies", prevActiveTasks); firstException.compareAndSet(null, new IllegalStateException("Not all lost tasks were closed as zombies")); } return firstException.get(); @@ -314,7 +314,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin final Set<TopicPartition> partitions) { if (suspended.containsKey(taskId)) { final StreamTask task = suspended.get(taskId); - log.trace("Found suspended {} {}", taskTypeName, taskId); + log.trace("Found suspended stream task {}", taskId); suspended.remove(taskId); if (task.partitions().equals(partitions)) { @@ -324,8 +324,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin } catch (final TaskMigratedException e) { // we need to catch migration exception internally since this function // is triggered in the rebalance callback - log.info("Failed to resume {} {} since it got migrated to another thread already. " + - "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id()); + log.info("Failed to resume the stream task {} since it got migrated to another thread already. " + + "Closing it as zombie before triggering a new rebalance.", task.id()); final RuntimeException fatalException = closeZombieTask(task); running.remove(taskId); @@ -334,10 +334,10 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin } throw e; } - log.trace("Resuming suspended {} {}", taskTypeName, task.id()); + log.trace("Resuming the suspended stream task {}", task.id()); return true; } else { - log.warn("Couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions()); + log.warn("Couldn't resume stream task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions()); task.closeSuspended(true, null); } } @@ -398,10 +398,10 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin if (task.commitRequested() && task.commitNeeded()) { task.commit(); committed++; - log.debug("Committed active task {} per user request in", task.id()); + log.debug("Committed stream task {} per user request in", task.id()); } } catch (final TaskMigratedException e) { - log.info("Failed to commit {} since it got migrated to another thread already. " + + log.info("Failed to commit stream task {} since it got migrated to another thread already. " + "Closing it as zombie before triggering a new rebalance.", task.id()); final RuntimeException fatalException = closeZombieTask(task); if (fatalException != null) { @@ -410,9 +410,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin it.remove(); throw e; } catch (final RuntimeException t) { - log.error("Failed to commit StreamTask {} due to the following error:", - task.id(), - t); + log.error("Failed to commit stream task {} due to the following error:", task.id(), t); if (firstException == null) { firstException = t; } @@ -510,6 +508,18 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin suspended.clear(); } + @Override + public void shutdown(final boolean clean) { + final String shutdownType = clean ? "Clean" : "Unclean"; + log.debug(shutdownType + " shutdown of all active tasks" + "\n" + + "non-initialized tasks to close: {}" + "\n" + + "restoring tasks to close: {}" + "\n" + + "running tasks to close: {}" + "\n" + + "suspended tasks to close: {}", + clean, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet()); + super.shutdown(clean); + } + public String toString(final String indent) { final StringBuilder builder = new StringBuilder(); builder.append(super.toString(indent)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 8c4c68f..56fefe0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -218,7 +218,7 @@ abstract class AssignedTasks<T extends Task> { return committed; } - void close(final boolean clean) { + void shutdown(final boolean clean) { final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); for (final T task: allTasks()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 55a33c0..81f76a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -204,6 +204,8 @@ public class StoreChangelogReader implements ChangelogReader { restorer.checkpoint(), restoreToOffsets.get(partition)); restorer.setStartingOffset(restoreConsumer.position(partition)); + + log.debug("Calling restorer for partition {} of task {}", partition, active.restoringTaskFor(partition)); restorer.restoreStarted(); } else { log.trace("Did not find checkpoint from changelog {} for store {}, rewinding to beginning.", partition, restorer.storeName()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index a2dac40..a9ccbf5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -279,15 +279,12 @@ public class TaskManager { void shutdown(final boolean clean) { final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); - log.debug("Shutting down all active tasks {}, standby tasks {}, and suspended tasks {}", active.runningTaskIds(), standby.runningTaskIds(), - active.suspendedTaskIds()); - try { - active.close(clean); + active.shutdown(clean); } catch (final RuntimeException fatalException) { firstException.compareAndSet(null, fatalException); } - standby.close(clean); + standby.shutdown(clean); // remove the changelog partitions from restore consumer try { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index fd9f0cc..a8f96e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -546,7 +546,7 @@ public class AssignedStreamsTasksTest { assignedTasks.initializeNewTasks(); assertNull(assignedTasks.suspendOrCloseTasks(assignedTasks.allAssignedTaskIds(), revokedChangelogs)); - assignedTasks.close(true); + assignedTasks.shutdown(true); } private void addAndInitTask() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index cf33fc4..59cc2a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -267,7 +267,9 @@ public class StreamTaskTest { assertTimeoutErrorLog(appender); // make sure we report the correct message - assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout.")); + assertThat( + expected.getMessage(), + is("stream-thread [" + Thread.currentThread().getName() + "] task [0_0] Failed to initialize task 0_0 due to timeout.")); // make sure we preserve the cause assertEquals(expected.getCause().getClass(), TimeoutException.class); @@ -326,7 +328,9 @@ public class StreamTaskTest { assertTimeoutErrorLog(appender); // make sure we report the correct message - assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout.")); + assertThat( + expected.getMessage(), + is("stream-thread [" + Thread.currentThread().getName() + "] task [0_0] Failed to initialize task 0_0 due to timeout.")); // make sure we preserve the cause assertEquals(expected.getCause().getClass(), TimeoutException.class); @@ -338,7 +342,7 @@ public class StreamTaskTest { private void assertTimeoutErrorLog(final LogCaptureAppender appender) { final String expectedErrorLogMessage = - "task [0_0] Timeout exception caught when initializing transactions for task 0_0. " + + "stream-thread [" + Thread.currentThread().getName() + "] task [0_0] Timeout exception caught when initializing transactions for task 0_0. " + "This might happen if the broker is slow to respond, if the network " + "connection to the broker was interrupted, or if similar circumstances arise. " + "You can increase producer parameter `max.block.ms` to increase this timeout."; @@ -1758,4 +1762,5 @@ public class StreamTaskTest { recordValue ); } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 73cf768..5b56eb8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1550,8 +1550,8 @@ public class StreamThreadTest { LogCaptureAppender.unregister(appender); final List<String> strings = appender.getMessages(); - assertTrue(strings.contains("task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[0]")); - assertTrue(strings.contains("task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[1]")); + assertTrue(strings.contains("stream-thread [" + Thread.currentThread().getName() + "] task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[0]")); + assertTrue(strings.contains("stream-thread [" + Thread.currentThread().getName() + "] task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[1]")); } @Test @@ -1618,33 +1618,35 @@ public class StreamThreadTest { LogCaptureAppender.unregister(appender); final List<String> strings = appender.getMessages(); + + final String threadTaskPrefix = "stream-thread [" + Thread.currentThread().getName() + "] task [0_1] "; assertTrue(strings.contains( - "task [0_1] Skipping record due to negative extracted timestamp. " + + threadTaskPrefix + "Skipping record due to negative extracted timestamp. " + "topic=[topic1] partition=[1] offset=[0] extractedTimestamp=[-1] " + "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]" )); assertTrue(strings.contains( - "task [0_1] Skipping record due to negative extracted timestamp. " + + threadTaskPrefix + "Skipping record due to negative extracted timestamp. " + "topic=[topic1] partition=[1] offset=[1] extractedTimestamp=[-1] " + "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]" )); assertTrue(strings.contains( - "task [0_1] Skipping record due to negative extracted timestamp. " + + threadTaskPrefix + "Skipping record due to negative extracted timestamp. " + "topic=[topic1] partition=[1] offset=[2] extractedTimestamp=[-1] " + "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]" )); assertTrue(strings.contains( - "task [0_1] Skipping record due to negative extracted timestamp. " + + threadTaskPrefix + "Skipping record due to negative extracted timestamp. " + "topic=[topic1] partition=[1] offset=[3] extractedTimestamp=[-1] " + "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]" )); assertTrue(strings.contains( - "task [0_1] Skipping record due to negative extracted timestamp. " + + threadTaskPrefix + "Skipping record due to negative extracted timestamp. " + "topic=[topic1] partition=[1] offset=[4] extractedTimestamp=[-1] " + "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]" )); assertTrue(strings.contains( - "task [0_1] Skipping record due to negative extracted timestamp. " + + threadTaskPrefix + "Skipping record due to negative extracted timestamp. " + "topic=[topic1] partition=[1] offset=[5] extractedTimestamp=[-1] " + "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]" )); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index e46a4cc..8e4b2c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -375,7 +375,7 @@ public class TaskManagerTest { @Test public void shouldCloseActiveTasksOnShutdown() { - active.close(true); + active.shutdown(true); expectLastCall(); replay(); @@ -385,7 +385,7 @@ public class TaskManagerTest { @Test public void shouldCloseStandbyTasksOnShutdown() { - standby.close(false); + standby.shutdown(false); expectLastCall(); replay();