This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e22a9c67587abae24dec3d8f76c46a9475230a94 Author: feyman2016 <feyman2...@aliyun.com> AuthorDate: Tue Jun 23 06:54:35 2020 +0800 KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager… (#8887) Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager --- .../streams/processor/internals/AbstractTask.java | 16 ------- .../streams/processor/internals/StandbyTask.java | 2 +- .../streams/processor/internals/StreamTask.java | 4 +- .../streams/processor/internals/TaskManager.java | 54 ++++++++++++++-------- 4 files changed, 39 insertions(+), 37 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index f59571b..044825a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.TaskId; import java.util.Collection; import java.util.Set; -import org.slf4j.Logger; import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED; import static org.apache.kafka.streams.processor.internals.Task.State.CREATED; @@ -104,21 +103,6 @@ public abstract class AbstractTask implements Task { } } - static void executeAndMaybeSwallow(final boolean clean, - final Runnable runnable, - final String name, - final Logger log) { - try { - runnable.run(); - } catch (final RuntimeException e) { - if (clean) { - throw e; - } else { - log.debug("Ignoring error in unclean {}", name); - } - } - } - @Override public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) { this.inputPartitions = topicPartitions; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index b334bc1..ffd09f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -206,7 +206,7 @@ public class StandbyTask extends AbstractTask implements Task { private void close(final boolean clean) { switch (state()) { case SUSPENDED: - executeAndMaybeSwallow( + TaskManager.executeAndMaybeSwallow( clean, () -> StateManagerUtil.closeStateManager( log, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index eb1bf4b..d6c6ea6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -528,7 +528,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, case SUSPENDED: // first close state manager (which is idempotent) then close the record collector // if the latter throws and we re-close dirty which would close the state manager again. - executeAndMaybeSwallow( + TaskManager.executeAndMaybeSwallow( clean, () -> StateManagerUtil.closeStateManager( log, @@ -542,7 +542,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, "state manager close", log); - executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log); + TaskManager.executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log); break; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index b90ed5f..7759d7e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -735,29 +735,23 @@ public class TaskManager { for (final Task task : tasks.values()) { if (task.isActive()) { - try { - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); - } catch (final RuntimeException e) { - if (clean) { - firstException.compareAndSet(null, e); - } else { - log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e); - } - } + executeAndMaybeSwallow( + clean, + () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()), + e -> firstException.compareAndSet(null, e), + e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e) + ); } } tasks.clear(); - try { - activeTaskCreator.closeThreadProducerIfNeeded(); - } catch (final RuntimeException e) { - if (clean) { - firstException.compareAndSet(null, e); - } else { - log.warn("Ignoring an exception while closing thread producer.", e); - } - } + executeAndMaybeSwallow( + clean, + activeTaskCreator::closeThreadProducerIfNeeded, + e -> firstException.compareAndSet(null, e), + e -> log.warn("Ignoring an exception while closing thread producer.", e) + ); try { // this should be called after closing all tasks, to make sure we unlock the task dir for tasks that may @@ -1042,4 +1036,28 @@ public class TaskManager { Set<TaskId> lockedTaskDirectories() { return Collections.unmodifiableSet(lockedTaskDirectories); } + + public static void executeAndMaybeSwallow(final boolean clean, + final Runnable runnable, + final java.util.function.Consumer<RuntimeException> actionIfClean, + final java.util.function.Consumer<RuntimeException> actionIfNotClean) { + try { + runnable.run(); + } catch (final RuntimeException e) { + if (clean) { + actionIfClean.accept(e); + } else { + actionIfNotClean.accept(e); + } + } + } + + public static void executeAndMaybeSwallow(final boolean clean, + final Runnable runnable, + final String name, + final Logger log) { + executeAndMaybeSwallow(clean, runnable, e -> { + throw e; }, + e -> log.debug("Ignoring error in unclean {}", name)); + } }