This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new f4c8845  KAFKA-7672: Restoring tasks need to be closed upon task 
suspension (#6113)
f4c8845 is described below

commit f4c8845c57b93be411497f8e620349350ab735d7
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Fri Feb 22 10:50:55 2019 -0800

    KAFKA-7672: Restoring tasks need to be closed upon task suspension (#6113)
    
    * In activeTasks.suspend, we should also close all restoring tasks as well. 
Closing restoring tasks would not require `task.close` as in 
`closeNonRunningTasks `, since the topology is not initialized yet, instead 
only state stores are initialized. So we only need to call 
`task.closeStateManager`.
    * Also add @linyli001 's fix.
    * Unit tests updated accordingly.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>,  John Roesler 
<j...@confluent.io>
---
 .../streams/processor/internals/AbstractTask.java  |  1 -
 .../processor/internals/AssignedStreamsTasks.java  | 89 +++++++++++++---------
 .../streams/processor/internals/AssignedTasks.java | 13 ++--
 .../processor/internals/StoreChangelogReader.java  |  1 +
 .../streams/processor/internals/TaskManager.java   | 11 ++-
 .../internals/AssignedStreamsTasksTest.java        |  6 +-
 .../processor/internals/TaskManagerTest.java       | 12 ---
 7 files changed, 73 insertions(+), 60 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 8eb024c..1838eb7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -243,7 +243,6 @@ public abstract class AbstractTask implements Task {
      * @param writeCheckpoint boolean indicating if a checkpoint file should 
be written
      * @throws ProcessorStateException if there is an error while closing the 
state manager
      */
-    // visible for testing
     void closeStateManager(final boolean writeCheckpoint) throws 
ProcessorStateException {
         ProcessorStateException exception = null;
         log.trace("Closing state manager");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 697482c..f0019ec 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -30,7 +30,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
 
 class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements 
RestoringTasks {
     private final Map<TaskId, StreamTask> restoring = new HashMap<>();
@@ -46,6 +45,52 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> 
implements Restorin
         return restoringByPartition.get(partition);
     }
 
+    @Override
+    List<StreamTask> allTasks() {
+        final List<StreamTask> tasks = super.allTasks();
+        tasks.addAll(restoring.values());
+        return tasks;
+    }
+
+    @Override
+    Set<TaskId> allAssignedTaskIds() {
+        final Set<TaskId> taskIds = super.allAssignedTaskIds();
+        taskIds.addAll(restoring.keySet());
+        return taskIds;
+    }
+
+    @Override
+    boolean allTasksRunning() {
+        return super.allTasksRunning() && restoring.isEmpty();
+    }
+
+    RuntimeException closeAllRestoringTasks() {
+        RuntimeException exception = null;
+
+        log.trace("Closing all restoring stream tasks {}", restoring.keySet());
+        final Iterator<StreamTask> restoringTaskIterator = 
restoring.values().iterator();
+        while (restoringTaskIterator.hasNext()) {
+            final StreamTask task = restoringTaskIterator.next();
+            log.debug("Closing restoring task {}", task.id());
+            try {
+                task.closeStateManager(true);
+            } catch (final RuntimeException e) {
+                log.error("Failed to remove restoring task {} due to the 
following error:", task.id(), e);
+                if (exception == null) {
+                    exception = e;
+                }
+            } finally {
+                restoringTaskIterator.remove();
+            }
+        }
+
+        restoring.clear();
+        restoredPartitions.clear();
+        restoringByPartition.clear();
+
+        return exception;
+    }
+
     void updateRestored(final Collection<TopicPartition> restored) {
         if (restored.isEmpty()) {
             return;
@@ -86,20 +131,6 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         }
     }
 
-    @Override
-    boolean allTasksRunning() {
-        return super.allTasksRunning() && restoring.isEmpty();
-    }
-
-    RuntimeException suspend() {
-        final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(super.suspend());
-        log.trace("Close restoring stream task {}", restoring.keySet());
-        firstException.compareAndSet(null, 
closeNonRunningTasks(restoring.values()));
-        restoring.clear();
-        restoringByPartition.clear();
-        return firstException.get();
-    }
-
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -218,27 +249,6 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         return punctuated;
     }
 
-    public String toString(final String indent) {
-        final StringBuilder builder = new StringBuilder();
-        builder.append(super.toString(indent));
-        describe(builder, restoring.values(), indent, "Restoring:");
-        return builder.toString();
-    }
-
-    @Override
-    List<StreamTask> allTasks() {
-        final List<StreamTask> tasks = super.allTasks();
-        tasks.addAll(restoring.values());
-        return tasks;
-    }
-
-    @Override
-    Set<TaskId> allAssignedTaskIds() {
-        final Set<TaskId> taskIds = super.allAssignedTaskIds();
-        taskIds.addAll(restoring.keySet());
-        return taskIds;
-    }
-
     void clear() {
         super.clear();
         restoring.clear();
@@ -246,6 +256,13 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         restoredPartitions.clear();
     }
 
+    public String toString(final String indent) {
+        final StringBuilder builder = new StringBuilder();
+        builder.append(super.toString(indent));
+        describe(builder, restoring.values(), indent, "Restoring:");
+        return builder.toString();
+    }
+
     // for testing only
 
     Collection<StreamTask> restoringTasks() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a33ecdc..a9baa3f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -84,8 +84,7 @@ abstract class AssignedTasks<T extends Task> {
     }
 
     boolean allTasksRunning() {
-        return created.isEmpty()
-                && suspended.isEmpty();
+        return created.isEmpty() && suspended.isEmpty();
     }
 
     Collection<T> running() {
@@ -106,7 +105,7 @@ abstract class AssignedTasks<T extends Task> {
         return firstException.get();
     }
 
-    RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
+    private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
         RuntimeException exception = null;
         for (final T task : tasks) {
             try {
@@ -167,7 +166,7 @@ abstract class AssignedTasks<T extends Task> {
     boolean maybeResumeSuspendedTask(final TaskId taskId, final 
Set<TopicPartition> partitions) {
         if (suspended.containsKey(taskId)) {
             final T task = suspended.get(taskId);
-            log.trace("found suspended {} {}", taskTypeName, taskId);
+            log.trace("Found suspended {} {}", taskTypeName, taskId);
             if (task.partitions().equals(partitions)) {
                 suspended.remove(taskId);
                 task.resume();
@@ -185,10 +184,10 @@ abstract class AssignedTasks<T extends Task> {
                     }
                     throw e;
                 }
-                log.trace("resuming suspended {} {}", taskTypeName, task.id());
+                log.trace("Resuming suspended {} {}", taskTypeName, task.id());
                 return true;
             } else {
-                log.warn("couldn't resume task {} assigned partitions {}, task 
partitions {}", taskId, partitions, task.partitions());
+                log.warn("Couldn't resume task {} assigned partitions {}, task 
partitions {}", taskId, partitions, task.partitions());
             }
         }
         return false;
@@ -198,7 +197,7 @@ abstract class AssignedTasks<T extends Task> {
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     void transitionToRunning(final T task) {
-        log.debug("transitioning {} {} to running", taskTypeName, task.id());
+        log.debug("Transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         task.initializeTopology();
         for (final TopicPartition topicPartition : task.partitions()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index fdd9d6c..59fb36f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -277,6 +277,7 @@ public class StoreChangelogReader implements 
ChangelogReader {
         needsRestoring.clear();
         endOffsets.clear();
         needsInitializing.clear();
+        completedRestorers.clear();
     }
 
     private long processNext(final List<ConsumerRecord<byte[], byte[]>> 
records,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9cc5a19..455d226 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -99,11 +99,11 @@ public class TaskManager {
             throw new IllegalStateException(logPrefix + "consumer has not been 
initialized while adding stream tasks. This should not happen.");
         }
 
-        changelogReader.reset();
         // do this first as we may have suspended standby tasks that
         // will become active or vice versa
         standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks);
         active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
+
         addStreamTasks(assignment);
         addStandbyTasks();
         // Pause all the partitions until the underlying state store is ready 
for all the active tasks.
@@ -240,7 +240,14 @@ public class TaskManager {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
         firstException.compareAndSet(null, active.suspend());
+        // close all restoring tasks as well and then reset changelog reader;
+        // for those restoring and still assigned tasks, they will be 
re-created
+        // in addStreamTasks.
+        firstException.compareAndSet(null, active.closeAllRestoringTasks());
+        changelogReader.reset();
+
         firstException.compareAndSet(null, standby.suspend());
+
         // remove the changelog partitions from restore consumer
         restoreConsumer.unsubscribe();
 
@@ -368,7 +375,7 @@ public class TaskManager {
     }
 
     public void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> 
activeTasks,
-                               final Map<TaskId, Set<TopicPartition>> 
standbyTasks) {
+                                      final Map<TaskId, Set<TopicPartition>> 
standbyTasks) {
         this.assignedActiveTasks = activeTasks;
         this.assignedStandbyTasks = standbyTasks;
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index fe71135..ffd0f8b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -146,11 +146,13 @@ public class AssignedStreamsTasksTest {
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
-        t1.close(false, false);
+        t1.closeStateManager(true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
-        assertThat(suspendTask(), nullValue());
+        addAndInitTask();
+        assertThat(assignedTasks.closeAllRestoringTasks(), nullValue());
+
         EasyMock.verify(t1);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 00af100..f71d7a1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -261,18 +261,6 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldResetChangeLogReaderOnCreateTasks() {
-        mockSingleActiveTask();
-        changeLogReader.reset();
-        EasyMock.expectLastCall();
-        replay();
-
-        taskManager.setAssignmentMetadata(taskId0Assignment, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        taskManager.createTasks(taskId0Partitions);
-        verify(changeLogReader);
-    }
-
-    @Test
     public void shouldAddNonResumedActiveTasks() {
         mockSingleActiveTask();
         EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, 
taskId0Partitions)).andReturn(false);

Reply via email to