This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 0f2f7e6  MINOR: improve logging of tasks on shutdown (#7597)
0f2f7e6 is described below

commit 0f2f7e689429cbc2c4474149386d2a2d37264716
Author: A. Sophie Blee-Goldman <sop...@confluent.io>
AuthorDate: Tue Oct 29 10:15:39 2019 -0700

    MINOR: improve logging of tasks on shutdown (#7597)
    
    Reviewers: Guozhang Wang <guozh...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../streams/processor/internals/AbstractTask.java  |  5 +-
 .../processor/internals/AssignedStandbyTasks.java  | 10 ++++
 .../processor/internals/AssignedStreamsTasks.java  | 62 +++++++++++++---------
 .../streams/processor/internals/AssignedTasks.java |  2 +-
 .../processor/internals/StoreChangelogReader.java  |  2 +
 .../streams/processor/internals/TaskManager.java   |  7 +--
 .../internals/AssignedStreamsTasksTest.java        |  2 +-
 .../processor/internals/StreamTaskTest.java        | 11 ++--
 .../processor/internals/StreamThreadTest.java      | 18 ++++---
 .../processor/internals/TaskManagerTest.java       |  4 +-
 10 files changed, 75 insertions(+), 48 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d8494fa..ede47ba 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -72,7 +72,8 @@ public abstract class AbstractTask implements Task {
         this.eosEnabled = 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
         this.stateDirectory = stateDirectory;
 
-        this.logPrefix = String.format("%s [%s] ", isStandby ? "standby-task" 
: "task", id);
+        final String threadIdPrefix = String.format("stream-thread [%s] ", 
Thread.currentThread().getName());
+        this.logPrefix = threadIdPrefix + String.format("%s [%s] ", isStandby 
? "standby-task" : "task", id);
         this.logContext = new LogContext(logPrefix);
         this.log = logContext.logger(getClass());
 
@@ -197,7 +198,7 @@ public abstract class AbstractTask implements Task {
         log.trace("Initializing state stores");
 
         for (final StateStore store : topology.stateStores()) {
-            log.trace("Initializing store {}", store.name());
+            log.debug("Initializing store {}", store.name());
             processorContext.uninitialize();
             store.init(processorContext, store);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index 9783970..0c9a70d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -31,6 +31,16 @@ class AssignedStandbyTasks extends 
AssignedTasks<StandbyTask> {
     }
 
     @Override
+    public void shutdown(final boolean clean) {
+        final String shutdownType = clean ? "Clean" : "Unclean";
+        log.debug(shutdownType + " shutdown of all standby tasks" + "\n" +
+                      "created tasks to close: {}" + "\n" +
+                      "running tasks to close: {}",
+            clean, created.keySet(), running.keySet());
+        super.shutdown(clean);
+    }
+
+    @Override
     int commit() {
         final int committed = super.commit();
         // TODO: this contortion would not be necessary if we got rid of the 
two-step
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index cba17d0..3515824 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -111,7 +111,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
             } else if (restoring.containsKey(task)) {
                 revokedRestoringTasks.add(task);
             } else if (!suspended.containsKey(task)) {
-                log.warn("Task {} was revoked but cannot be found in the 
assignment, may have been closed due to error", task);
+                log.warn("Stream task {} was revoked but cannot be found in 
the assignment, may have been closed due to error", task);
             }
         }
 
@@ -126,7 +126,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                                                  final List<TopicPartition> 
taskChangelogs) {
 
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
-        log.debug("Suspending running {} {}", taskTypeName, running.keySet());
+        log.debug("Suspending the running stream tasks {}", running.keySet());
 
         for (final TaskId id : runningTasksToSuspend) {
             final StreamTask task = running.get(id);
@@ -136,20 +136,20 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                 suspended.put(id, task);
             } catch (final TaskMigratedException closeAsZombieAndSwallow) {
                 // swallow and move on since we are rebalancing
-                log.info("Failed to suspend {} {} since it got migrated to 
another thread already. " +
-                    "Closing it as zombie and move on.", taskTypeName, id);
+                log.info("Failed to suspend the stream task {} since it got 
migrated to another thread already. " +
+                    "Closing it as zombie and moving on.", id);
                 firstException.compareAndSet(null, closeZombieTask(task));
                 prevActiveTasks.remove(id);
             } catch (final RuntimeException e) {
-                log.error("Suspending {} {} failed due to the following 
error:", taskTypeName, id, e);
+                log.error("Suspending the stream task {} failed due to the 
following error:", id, e);
                 firstException.compareAndSet(null, e);
                 try {
                     prevActiveTasks.remove(id);
                     task.close(false, false);
                 } catch (final RuntimeException f) {
                     log.error(
-                        "After suspending failed, closing the same {} {} 
failed again due to the following error:",
-                        taskTypeName, id, f);
+                        "After suspending failed, closing the same stream task 
{} failed again due to the following error:",
+                        id, f);
                 }
             } finally {
                 running.remove(id);
@@ -159,14 +159,14 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
             }
         }
 
-        log.trace("Successfully suspended the running {} {}", taskTypeName, 
suspended.keySet());
+        log.trace("Successfully suspended the running stream task {}", 
suspended.keySet());
 
         return firstException.get();
     }
 
     private RuntimeException closeNonRunningTasks(final Set<TaskId> 
nonRunningTasksToClose,
                                                   final List<TopicPartition> 
closedTaskChangelogs) {
-        log.debug("Closing the created but not initialized {} {}", 
taskTypeName, nonRunningTasksToClose);
+        log.debug("Closing the created but not initialized stream tasks {}", 
nonRunningTasksToClose);
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>();
 
         for (final TaskId id : nonRunningTasksToClose) {
@@ -202,7 +202,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
             final boolean clean = !isZombie;
             task.close(clean, isZombie);
         } catch (final RuntimeException e) {
-            log.error("Failed to close {}, {}", taskTypeName, task.id(), e);
+            log.error("Failed to close the stream task {}", task.id(), e);
             return e;
         }
 
@@ -218,7 +218,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         try {
             task.close(false, isZombie);
         } catch (final RuntimeException e) {
-            log.error("Failed to close {}, {}", taskTypeName, task.id(), e);
+            log.error("Failed to close the stream task {}", task.id(), e);
             return e;
         }
 
@@ -239,7 +239,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
             final boolean clean = !isZombie;
             task.closeStateManager(clean);
         } catch (final RuntimeException e) {
-            log.error("Failed to close restoring task {} due to the following 
error:", task.id(), e);
+            log.error("Failed to close the restoring stream task {} due to the 
following error:", task.id(), e);
             return e;
         }
 
@@ -254,7 +254,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
             final boolean clean = !isZombie;
             task.closeSuspended(clean, null);
         } catch (final RuntimeException e) {
-            log.error("Failed to close suspended {} {} due to the following 
error:", taskTypeName, task.id(), e);
+            log.error("Failed to close the suspended stream task {} due to the 
following error:", task.id(), e);
             return e;
         }
 
@@ -262,7 +262,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
     }
 
     RuntimeException closeNotAssignedSuspendedTasks(final Set<TaskId> 
revokedTasks) {
-        log.debug("Closing the revoked active tasks {} {}", taskTypeName, 
revokedTasks);
+        log.debug("Closing the revoked active stream tasks {}", revokedTasks);
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
         for (final TaskId revokedTask : revokedTasks) {
@@ -271,7 +271,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
             if (suspendedTask != null) {
                 firstException.compareAndSet(null, closeSuspended(false, 
suspendedTask));
             } else {
-                log.debug("Revoked task {} could not be found in suspended, 
may have already been closed", revokedTask);
+                log.debug("Revoked stream task {} could not be found in 
suspended, may have already been closed", revokedTask);
             }
         }
         return firstException.get();
@@ -301,7 +301,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
 
         // With the current rebalance protocol, there should not be any 
running tasks left as they were all lost
         if (!prevActiveTasks.isEmpty()) {
-            log.error("Found still running {} after closing all tasks lost as 
zombies", taskTypeName);
+            log.error("Found the still running stream tasks {} after closing 
all tasks lost as zombies", prevActiveTasks);
             firstException.compareAndSet(null, new IllegalStateException("Not 
all lost tasks were closed as zombies"));
         }
         return firstException.get();
@@ -314,7 +314,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                                      final Set<TopicPartition> partitions) {
         if (suspended.containsKey(taskId)) {
             final StreamTask task = suspended.get(taskId);
-            log.trace("Found suspended {} {}", taskTypeName, taskId);
+            log.trace("Found suspended stream task {}", taskId);
             suspended.remove(taskId);
 
             if (task.partitions().equals(partitions)) {
@@ -324,8 +324,8 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                 } catch (final TaskMigratedException e) {
                     // we need to catch migration exception internally since 
this function
                     // is triggered in the rebalance callback
-                    log.info("Failed to resume {} {} since it got migrated to 
another thread already. " +
-                        "Closing it as zombie before triggering a new 
rebalance.", taskTypeName, task.id());
+                    log.info("Failed to resume the stream task {} since it got 
migrated to another thread already. " +
+                        "Closing it as zombie before triggering a new 
rebalance.", task.id());
                     final RuntimeException fatalException = 
closeZombieTask(task);
                     running.remove(taskId);
 
@@ -334,10 +334,10 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                     }
                     throw e;
                 }
-                log.trace("Resuming suspended {} {}", taskTypeName, task.id());
+                log.trace("Resuming the suspended stream task {}", task.id());
                 return true;
             } else {
-                log.warn("Couldn't resume task {} assigned partitions {}, task 
partitions {}", taskId, partitions, task.partitions());
+                log.warn("Couldn't resume stream task {} assigned partitions 
{}, task partitions {}", taskId, partitions, task.partitions());
                 task.closeSuspended(true, null);
             }
         }
@@ -398,10 +398,10 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                 if (task.commitRequested() && task.commitNeeded()) {
                     task.commit();
                     committed++;
-                    log.debug("Committed active task {} per user request in", 
task.id());
+                    log.debug("Committed stream task {} per user request in", 
task.id());
                 }
             } catch (final TaskMigratedException e) {
-                log.info("Failed to commit {} since it got migrated to another 
thread already. " +
+                log.info("Failed to commit stream task {} since it got 
migrated to another thread already. " +
                         "Closing it as zombie before triggering a new 
rebalance.", task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
@@ -410,9 +410,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                 it.remove();
                 throw e;
             } catch (final RuntimeException t) {
-                log.error("Failed to commit StreamTask {} due to the following 
error:",
-                        task.id(),
-                        t);
+                log.error("Failed to commit stream task {} due to the 
following error:", task.id(), t);
                 if (firstException == null) {
                     firstException = t;
                 }
@@ -510,6 +508,18 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         suspended.clear();
     }
 
+    @Override
+    public void shutdown(final boolean clean) {
+        final String shutdownType = clean ? "Clean" : "Unclean";
+        log.debug(shutdownType + " shutdown of all active tasks" + "\n" +
+                      "non-initialized tasks to close: {}" + "\n" +
+                      "restoring tasks to close: {}" + "\n" +
+                      "running tasks to close: {}" + "\n" +
+                      "suspended tasks to close: {}",
+            clean, created.keySet(), restoring.keySet(), running.keySet(), 
suspended.keySet());
+        super.shutdown(clean);
+    }
+
     public String toString(final String indent) {
         final StringBuilder builder = new StringBuilder();
         builder.append(super.toString(indent));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 8c4c68f..56fefe0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -218,7 +218,7 @@ abstract class AssignedTasks<T extends Task> {
         return committed;
     }
 
-    void close(final boolean clean) {
+    void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
         for (final T task: allTasks()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 55a33c0..81f76a3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -204,6 +204,8 @@ public class StoreChangelogReader implements 
ChangelogReader {
                         restorer.checkpoint(),
                         restoreToOffsets.get(partition));
                 
restorer.setStartingOffset(restoreConsumer.position(partition));
+
+                log.debug("Calling restorer for partition {} of task {}", 
partition, active.restoringTaskFor(partition));
                 restorer.restoreStarted();
             } else {
                 log.trace("Did not find checkpoint from changelog {} for store 
{}, rewinding to beginning.", partition, restorer.storeName());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index a2dac40..a9ccbf5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -279,15 +279,12 @@ public class TaskManager {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
-        log.debug("Shutting down all active tasks {}, standby tasks {}, and 
suspended tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
-                  active.suspendedTaskIds());
-
         try {
-            active.close(clean);
+            active.shutdown(clean);
         } catch (final RuntimeException fatalException) {
             firstException.compareAndSet(null, fatalException);
         }
-        standby.close(clean);
+        standby.shutdown(clean);
 
         // remove the changelog partitions from restore consumer
         try {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index fd9f0cc..a8f96e4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -546,7 +546,7 @@ public class AssignedStreamsTasksTest {
         assignedTasks.initializeNewTasks();
         
assertNull(assignedTasks.suspendOrCloseTasks(assignedTasks.allAssignedTaskIds(),
 revokedChangelogs));
 
-        assignedTasks.close(true);
+        assignedTasks.shutdown(true);
     }
 
     private void addAndInitTask() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index cf33fc4..59cc2a5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -267,7 +267,9 @@ public class StreamTaskTest {
             assertTimeoutErrorLog(appender);
 
             // make sure we report the correct message
-            assertThat(expected.getMessage(), is("task [0_0] Failed to 
initialize task 0_0 due to timeout."));
+            assertThat(
+                expected.getMessage(),
+                is("stream-thread [" + Thread.currentThread().getName() + "] 
task [0_0] Failed to initialize task 0_0 due to timeout."));
 
             // make sure we preserve the cause
             assertEquals(expected.getCause().getClass(), 
TimeoutException.class);
@@ -326,7 +328,9 @@ public class StreamTaskTest {
             assertTimeoutErrorLog(appender);
 
             // make sure we report the correct message
-            assertThat(expected.getMessage(), is("task [0_0] Failed to 
initialize task 0_0 due to timeout."));
+            assertThat(
+                expected.getMessage(),
+                is("stream-thread [" + Thread.currentThread().getName() + "] 
task [0_0] Failed to initialize task 0_0 due to timeout."));
 
             // make sure we preserve the cause
             assertEquals(expected.getCause().getClass(), 
TimeoutException.class);
@@ -338,7 +342,7 @@ public class StreamTaskTest {
     private void assertTimeoutErrorLog(final LogCaptureAppender appender) {
 
         final String expectedErrorLogMessage =
-            "task [0_0] Timeout exception caught when initializing 
transactions for task 0_0. " +
+            "stream-thread [" + Thread.currentThread().getName() + "] task 
[0_0] Timeout exception caught when initializing transactions for task 0_0. " +
                 "This might happen if the broker is slow to respond, if the 
network " +
                 "connection to the broker was interrupted, or if similar 
circumstances arise. " +
                 "You can increase producer parameter `max.block.ms` to 
increase this timeout.";
@@ -1758,4 +1762,5 @@ public class StreamTaskTest {
             recordValue
         );
     }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 73cf768..5b56eb8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1550,8 +1550,8 @@ public class StreamThreadTest {
 
         LogCaptureAppender.unregister(appender);
         final List<String> strings = appender.getMessages();
-        assertTrue(strings.contains("task [0_1] Skipping record due to 
deserialization error. topic=[topic1] partition=[1] offset=[0]"));
-        assertTrue(strings.contains("task [0_1] Skipping record due to 
deserialization error. topic=[topic1] partition=[1] offset=[1]"));
+        assertTrue(strings.contains("stream-thread [" + 
Thread.currentThread().getName() + "] task [0_1] Skipping record due to 
deserialization error. topic=[topic1] partition=[1] offset=[0]"));
+        assertTrue(strings.contains("stream-thread [" + 
Thread.currentThread().getName() + "] task [0_1] Skipping record due to 
deserialization error. topic=[topic1] partition=[1] offset=[1]"));
     }
 
     @Test
@@ -1618,33 +1618,35 @@ public class StreamThreadTest {
 
         LogCaptureAppender.unregister(appender);
         final List<String> strings = appender.getMessages();
+
+        final String threadTaskPrefix = "stream-thread [" + 
Thread.currentThread().getName() + "] task [0_1] ";
         assertTrue(strings.contains(
-            "task [0_1] Skipping record due to negative extracted timestamp. " 
+
+            threadTaskPrefix + "Skipping record due to negative extracted 
timestamp. " +
                 "topic=[topic1] partition=[1] offset=[0] 
extractedTimestamp=[-1] " +
                 
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
         ));
         assertTrue(strings.contains(
-            "task [0_1] Skipping record due to negative extracted timestamp. " 
+
+            threadTaskPrefix + "Skipping record due to negative extracted 
timestamp. " +
                 "topic=[topic1] partition=[1] offset=[1] 
extractedTimestamp=[-1] " +
                 
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
         ));
         assertTrue(strings.contains(
-            "task [0_1] Skipping record due to negative extracted timestamp. " 
+
+            threadTaskPrefix + "Skipping record due to negative extracted 
timestamp. " +
                 "topic=[topic1] partition=[1] offset=[2] 
extractedTimestamp=[-1] " +
                 
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
         ));
         assertTrue(strings.contains(
-            "task [0_1] Skipping record due to negative extracted timestamp. " 
+
+            threadTaskPrefix + "Skipping record due to negative extracted 
timestamp. " +
                 "topic=[topic1] partition=[1] offset=[3] 
extractedTimestamp=[-1] " +
                 
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
         ));
         assertTrue(strings.contains(
-            "task [0_1] Skipping record due to negative extracted timestamp. " 
+
+            threadTaskPrefix + "Skipping record due to negative extracted 
timestamp. " +
                 "topic=[topic1] partition=[1] offset=[4] 
extractedTimestamp=[-1] " +
                 
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
         ));
         assertTrue(strings.contains(
-            "task [0_1] Skipping record due to negative extracted timestamp. " 
+
+            threadTaskPrefix + "Skipping record due to negative extracted 
timestamp. " +
                 "topic=[topic1] partition=[1] offset=[5] 
extractedTimestamp=[-1] " +
                 
"extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
         ));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index e46a4cc..8e4b2c2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -375,7 +375,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCloseActiveTasksOnShutdown() {
-        active.close(true);
+        active.shutdown(true);
         expectLastCall();
         replay();
 
@@ -385,7 +385,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCloseStandbyTasksOnShutdown() {
-        standby.close(false);
+        standby.shutdown(false);
         expectLastCall();
         replay();
 

Reply via email to