This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 11a406e [FLINK-25678][runtime] Make TaskExecutorStateChangelogStoragesManager.shutdown thread-safe 11a406e is described below commit 11a406e67057ca9260c16c08054c209e3452a291 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Wed Jan 19 15:22:51 2022 +0100 [FLINK-25678][runtime] Make TaskExecutorStateChangelogStoragesManager.shutdown thread-safe The method is called from the shutdown hook and must be thread-safe. --- .../TaskExecutorStateChangelogStoragesManager.java | 100 ++++++++++++--------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java index 1af6e35..934eff8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java @@ -29,16 +29,16 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; -/** - * This class holds the all {@link StateChangelogStorage} objects for a task executor (manager). No - * thread-safe. - */ +/** This class holds the all {@link StateChangelogStorage} objects for a task executor (manager). */ +@ThreadSafe public class TaskExecutorStateChangelogStoragesManager { /** Logger for this class. */ @@ -50,10 +50,14 @@ public class TaskExecutorStateChangelogStoragesManager { * that own the instance of this. Maps from job id to all the subtask's state changelog * storages. Value type Optional is for containing the null value. */ + @GuardedBy("lock") private final Map<JobID, Optional<StateChangelogStorage<?>>> changelogStoragesByJobId; + @GuardedBy("lock") private boolean closed; + private final Object lock = new Object(); + /** shutdown hook for this manager. */ private final Thread shutdownHook; @@ -69,63 +73,73 @@ public class TaskExecutorStateChangelogStoragesManager { @Nullable public StateChangelogStorage<?> stateChangelogStorageForJob( @Nonnull JobID jobId, Configuration configuration) throws IOException { - if (closed) { - throw new IllegalStateException( - "TaskExecutorStateChangelogStoragesManager is already closed and cannot " - + "register a new StateChangelogStorage."); - } - - Optional<StateChangelogStorage<?>> stateChangelogStorage = - changelogStoragesByJobId.get(jobId); - - if (stateChangelogStorage == null) { - StateChangelogStorage<?> loaded = StateChangelogStorageLoader.load(configuration); - stateChangelogStorage = Optional.ofNullable(loaded); - changelogStoragesByJobId.put(jobId, stateChangelogStorage); + synchronized (lock) { + if (closed) { + throw new IllegalStateException( + "TaskExecutorStateChangelogStoragesManager is already closed and cannot " + + "register a new StateChangelogStorage."); + } - if (loaded != null) { - LOG.debug("Registered new state changelog storage for job {} : {}.", jobId, loaded); + Optional<StateChangelogStorage<?>> stateChangelogStorage = + changelogStoragesByJobId.get(jobId); + + if (stateChangelogStorage == null) { + StateChangelogStorage<?> loaded = StateChangelogStorageLoader.load(configuration); + stateChangelogStorage = Optional.ofNullable(loaded); + changelogStoragesByJobId.put(jobId, stateChangelogStorage); + + if (loaded != null) { + LOG.debug( + "Registered new state changelog storage for job {} : {}.", + jobId, + loaded); + } else { + LOG.info( + "Try to registered new state changelog storage for job {}," + + " but result is null.", + jobId); + } + } else if (stateChangelogStorage.isPresent()) { + LOG.debug( + "Found existing state changelog storage for job {}: {}.", + jobId, + stateChangelogStorage.get()); } else { - LOG.info( - "Try to registered new state changelog storage for job {}," - + " but result is null.", + LOG.debug( + "Found a previously loaded NULL state changelog storage for job {}.", jobId); } - } else if (stateChangelogStorage.isPresent()) { - LOG.debug( - "Found existing state changelog storage for job {}: {}.", - jobId, - stateChangelogStorage.get()); - } else { - LOG.debug("Found a previously loaded NULL state changelog storage for job {}.", jobId); - } - return stateChangelogStorage.orElse(null); + return stateChangelogStorage.orElse(null); + } } public void releaseStateChangelogStorageForJob(@Nonnull JobID jobId) { LOG.debug("Releasing state changelog storage under job id {}.", jobId); - if (closed) { - return; + Optional<StateChangelogStorage<?>> cleanupChangelogStorage; + synchronized (lock) { + if (closed) { + return; + } + cleanupChangelogStorage = changelogStoragesByJobId.remove(jobId); } - Optional<StateChangelogStorage<?>> cleanupChangelogStorage = - changelogStoragesByJobId.remove(jobId); - if (cleanupChangelogStorage != null) { cleanupChangelogStorage.ifPresent(this::doRelease); } } public void shutdown() { - if (closed) { - return; - } - closed = true; + HashMap<JobID, Optional<StateChangelogStorage<?>>> toRelease; + synchronized (lock) { + if (closed) { + return; + } + closed = true; - HashMap<JobID, Optional<StateChangelogStorage<?>>> toRelease = - new HashMap<>(changelogStoragesByJobId); - changelogStoragesByJobId.clear(); + toRelease = new HashMap<>(changelogStoragesByJobId); + changelogStoragesByJobId.clear(); + } ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);