[hotfix] Introduce NoOpTaskLocalStateStoreImpl that is used as store if local 
recovery is disabled

This implementation will no go through all the registration/lookup steps or a 
normal state store, beause
they are not required if local recovery is disabled.

(cherry picked from commit 2bc1eaa)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57887cd9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57887cd9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57887cd9

Branch: refs/heads/release-1.5
Commit: 57887cd94d238c4d6239d5e4028a1d7ff341cb75
Parents: 8a8b6c4
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Tue Mar 13 10:49:33 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu May 17 10:07:44 2018 +0200

----------------------------------------------------------------------
 .../state/NoOpTaskLocalStateStoreImpl.java      | 71 ++++++++++++++++++++
 .../runtime/state/OwnedTaskLocalStateStore.java | 38 +++++++++++
 .../TaskExecutorLocalStateStoresManager.java    | 36 +++++-----
 .../runtime/state/TaskLocalStateStore.java      |  2 +
 .../runtime/state/TaskLocalStateStoreImpl.java  |  3 +-
 5 files changed, 134 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57887cd9/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
new file mode 100644
index 0000000..11841a1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.LongPredicate;
+
+/**
+ * This class implements a {@link TaskLocalStateStore} with no functionality 
and is used when local recovery is
+ * disabled.
+ */
+public final class NoOpTaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
+
+       /** The configuration for local recovery. */
+       @Nonnull
+       private final LocalRecoveryConfig localRecoveryConfig;
+
+       NoOpTaskLocalStateStoreImpl(@Nonnull LocalRecoveryConfig 
localRecoveryConfig) {
+               this.localRecoveryConfig = localRecoveryConfig;
+       }
+
+       @Nonnull
+       @Override
+       public LocalRecoveryConfig getLocalRecoveryConfig() {
+               return localRecoveryConfig;
+       }
+
+       @Override
+       public CompletableFuture<Void> dispose() {
+               return CompletableFuture.completedFuture(null);
+       }
+
+       @Override
+       public void storeLocalState(long checkpointId, @Nullable 
TaskStateSnapshot localState) {
+       }
+
+       @Nullable
+       @Override
+       public TaskStateSnapshot retrieveLocalState(long checkpointID) {
+               return null;
+       }
+
+       @Override
+       public void confirmCheckpoint(long confirmedCheckpointId) {
+       }
+
+       @Override
+       public void pruneMatchingCheckpoints(LongPredicate matcher) {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57887cd9/flink-runtime/src/main/java/org/apache/flink/runtime/state/OwnedTaskLocalStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OwnedTaskLocalStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OwnedTaskLocalStateStore.java
new file mode 100644
index 0000000..b73626c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OwnedTaskLocalStateStore.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This interface represents the administrative interface to {@link 
TaskLocalStateStore}, that only the owner of the
+ * object should see. All clients that want to use the service should only see 
the {@link TaskLocalStateStore}
+ * interface.
+ */
+@Internal
+public interface OwnedTaskLocalStateStore extends TaskLocalStateStore {
+
+       /**
+        * Disposes the task local state store. Disposal can happen 
asynchronously and completion is signaled through the
+        * returned future.
+        */
+       CompletableFuture<Void> dispose();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57887cd9/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 4919f80..cb3b680 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -51,7 +51,7 @@ public class TaskExecutorLocalStateStoresManager {
         * this. Maps from allocation id to all the subtask's local state 
stores.
         */
        @GuardedBy("lock")
-       private final Map<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStoreImpl>> taskStateStoresByAllocationID;
+       private final Map<AllocationID, Map<JobVertexSubtaskKey, 
OwnedTaskLocalStateStore>> taskStateStoresByAllocationID;
 
        /** The configured mode for local recovery on this task manager. */
        private final boolean localRecoveryEnabled;
@@ -111,7 +111,7 @@ public class TaskExecutorLocalStateStoresManager {
                                        "register a new TaskLocalStateStore.");
                        }
 
-                       Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> 
taskStateManagers =
+                       Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> 
taskStateManagers =
                                
this.taskStateStoresByAllocationID.get(allocationID);
 
                        if (taskStateManagers == null) {
@@ -126,7 +126,7 @@ public class TaskExecutorLocalStateStoresManager {
 
                        final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
 
-                       TaskLocalStateStoreImpl taskLocalStateStore = 
taskStateManagers.get(taskKey);
+                       OwnedTaskLocalStateStore taskLocalStateStore = 
taskStateManagers.get(taskKey);
 
                        if (taskLocalStateStore == null) {
 
@@ -142,13 +142,19 @@ public class TaskExecutorLocalStateStoresManager {
                                LocalRecoveryConfig localRecoveryConfig =
                                        new 
LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
 
-                               taskLocalStateStore = new 
TaskLocalStateStoreImpl(
-                                       jobId,
-                                       allocationID,
-                                       jobVertexID,
-                                       subtaskIndex,
-                                       localRecoveryConfig,
-                                       discardExecutor);
+                               taskLocalStateStore = (localRecoveryMode != 
LocalRecoveryConfig.LocalRecoveryMode.DISABLED) ?
+
+                                               // Real store implementation if 
local recovery is enabled
+                                               new TaskLocalStateStoreImpl(
+                                                       jobId,
+                                                       allocationID,
+                                                       jobVertexID,
+                                                       subtaskIndex,
+                                                       localRecoveryConfig,
+                                                       discardExecutor) :
+
+                                               // NOP implementation if local 
recovery is disabled
+                                               new 
NoOpTaskLocalStateStoreImpl(localRecoveryConfig);
 
                                taskStateManagers.put(taskKey, 
taskLocalStateStore);
 
@@ -173,7 +179,7 @@ public class TaskExecutorLocalStateStoresManager {
                        LOG.debug("Releasing local state under allocation id 
{}.", allocationID);
                }
 
-               Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> 
cleanupLocalStores;
+               Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> 
cleanupLocalStores;
 
                synchronized (lock) {
                        if (closed) {
@@ -191,7 +197,7 @@ public class TaskExecutorLocalStateStoresManager {
 
        public void shutdown() {
 
-               HashMap<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStoreImpl>> toRelease;
+               HashMap<AllocationID, Map<JobVertexSubtaskKey, 
OwnedTaskLocalStateStore>> toRelease;
 
                synchronized (lock) {
 
@@ -208,7 +214,7 @@ public class TaskExecutorLocalStateStoresManager {
 
                LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
 
-               for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStoreImpl>> entry :
+               for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, 
OwnedTaskLocalStateStore>> entry :
                        toRelease.entrySet()) {
 
                        doRelease(entry.getValue().values());
@@ -240,11 +246,11 @@ public class TaskExecutorLocalStateStoresManager {
                return allocationDirectories;
        }
 
-       private void doRelease(Iterable<TaskLocalStateStoreImpl> toRelease) {
+       private void doRelease(Iterable<OwnedTaskLocalStateStore> toRelease) {
 
                if (toRelease != null) {
 
-                       for (TaskLocalStateStoreImpl stateStore : toRelease) {
+                       for (OwnedTaskLocalStateStore stateStore : toRelease) {
                                try {
                                        stateStore.dispose();
                                } catch (Exception disposeEx) {

http://git-wip-us.apache.org/repos/asf/flink/blob/57887cd9/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
index 686f4f6..b0d8a82 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 
 import javax.annotation.Nonnegative;
@@ -33,6 +34,7 @@ import java.util.function.LongPredicate;
  * state is typically lost in case of machine failures. In such cases (and 
others), client code of this class must fall
  * back to using the slower but highly available store.
  */
+@Internal
 public interface TaskLocalStateStore {
        /**
         * Stores the local state for the given checkpoint id.

http://git-wip-us.apache.org/repos/asf/flink/blob/57887cd9/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index 29adc4a..9d105e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -53,7 +53,7 @@ import java.util.function.LongPredicate;
 /**
  * Main implementation of a {@link TaskLocalStateStore}.
  */
-public class TaskLocalStateStoreImpl implements TaskLocalStateStore {
+public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
 
        /** Logger for this class. */
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);
@@ -232,6 +232,7 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
        /**
         * Disposes the state of all local snapshots managed by this object.
         */
+       @Override
        public CompletableFuture<Void> dispose() {
 
                Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy;

Reply via email to