Repository: flink
Updated Branches:
  refs/heads/release-1.4 1a852ecfe -> 27a39de0f


[FLINK-9693] Set Execution#taskState to null after deployment

Setting the assigned Execution#taskState to null after the deployment allows the
TaskStateSnapshot instance to be garbage collected. Furthermore, it won't be
archived along with the Execution in the ExecutionVertex in case of a restart. 
This
is especially important when setting state.backend.fs.memory-threshold to larger
values because every state below this threshold will be stored in the meta 
state files
and, thus, also the TaskStateSnapshot instances.

This closes #6251.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27a39de0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27a39de0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27a39de0

Branch: refs/heads/release-1.4
Commit: 27a39de0f2c47068f3e3aaf2574a0bd6a757e08d
Parents: 1a852ec
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Jul 4 11:05:25 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Jul 12 14:06:58 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |   3 +
 .../runtime/executiongraph/ExecutionTest.java   | 151 +++++++++++++++++--
 2 files changed, 145 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/27a39de0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 38c3821..48b9f8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -514,6 +514,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                taskState,
                                attemptNumber);
 
+                       // null taskState to let it be GC'ed
+                       taskState = null;
+
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
 
                        final CompletableFuture<Acknowledge> submitResultFuture 
= taskManagerGateway.submitTask(deployment, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/27a39de0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index fa845cf..e616628 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -31,17 +32,24 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -52,15 +60,16 @@ import static org.junit.Assert.assertTrue;
  */
 public class ExecutionTest extends TestLogger {
 
+       public static final JobID JOB_ID = new JobID();
+
        /**
         * Tests that slots are released if we cannot assign the allocated 
resource to the
         * Execution. In this case, a concurrent cancellation precedes the 
assignment.
         */
        @Test
        public void testSlotReleaseOnFailedResourceAssignment() throws 
Exception {
-               final JobVertexID jobVertexId = new JobVertexID();
-               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
-               jobVertex.setInvokableClass(NoOpInvokable.class);
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
 
                final CompletableFuture<SimpleSlot> slotFuture = new 
CompletableFuture<>();
                final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
@@ -112,9 +121,8 @@ public class ExecutionTest extends TestLogger {
         */
        @Test
        public void testSlotReleaseOnExecutionCancellationInScheduled() throws 
Exception {
-               final JobVertexID jobVertexId = new JobVertexID();
-               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
-               jobVertex.setInvokableClass(NoOpInvokable.class);
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
 
                final TestingSlotOwner slotOwner = new TestingSlotOwner();
 
@@ -162,9 +170,8 @@ public class ExecutionTest extends TestLogger {
         */
        @Test
        public void testSlotReleaseOnExecutionCancellationInRunning() throws 
Exception {
-               final JobVertexID jobVertexId = new JobVertexID();
-               final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
-               jobVertex.setInvokableClass(NoOpInvokable.class);
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
 
                final TestingSlotOwner slotOwner = new TestingSlotOwner();
 
@@ -268,6 +275,132 @@ public class ExecutionTest extends TestLogger {
        }
 
        /**
+        * Checks that the {@link Execution} termination future is only 
completed after the
+        * assigned slot has been released.
+        *
+        * <p>NOTE: This test only fails spuriously without the fix of this 
commit. Thus, one has
+        * to execute this test multiple times to see the failure.
+        */
+       @Test
+       public void testTerminationFutureIsCompletedAfterSlotRelease() throws 
Exception {
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
+
+               final TestingSlotOwner slotOwner = new TestingSlotOwner();
+               final ProgrammedSlotProvider slotProvider = 
createProgrammedSlotProvider(
+                       1,
+                       Collections.singleton(jobVertexId),
+                       slotOwner);
+
+               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       jobVertex);
+
+               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+               ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+               executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY);
+
+               Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
+
+               CompletableFuture<Slot> returnedSlotFuture = 
slotOwner.getReturnedSlotFuture();
+               CompletableFuture<?> terminationFuture = 
executionVertex.cancel();
+
+               // run canceling in a separate thread to allow an interleaving 
between termination
+               // future callback registrations
+               CompletableFuture.runAsync(
+                       () -> currentExecutionAttempt.cancelingComplete(),
+                       TestingUtils.defaultExecutor());
+
+               // to increase probability for problematic interleaving, let 
the current thread yield the processor
+               Thread.yield();
+
+               CompletableFuture<Boolean> restartFuture = 
terminationFuture.thenApply(
+                       ignored -> {
+                               assertTrue(returnedSlotFuture.isDone());
+                               return true;
+                       });
+
+
+               // check if the returned slot future was completed first
+               restartFuture.get();
+       }
+
+       /**
+        * Tests that the task restore state is nulled after the {@link 
Execution} has been
+        * deployed. See FLINK-9693.
+        */
+       @Test
+       public void testTaskRestoreStateIsNulledAfterDeployment() throws 
Exception {
+               final JobVertex jobVertex = createNoOpJobVertex();
+               final JobVertexID jobVertexId = jobVertex.getID();
+
+               final TestingSlotOwner slotOwner = new TestingSlotOwner();
+               final ProgrammedSlotProvider slotProvider = 
createProgrammedSlotProvider(
+                       1,
+                       Collections.singleton(jobVertexId),
+                       slotOwner);
+
+               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       jobVertex);
+
+               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+               ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+               final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+               final TaskStateSnapshot taskStateSnapshot = new 
TaskStateSnapshot();
+               execution.setInitialState(taskStateSnapshot);
+
+               assertThat(execution.getTaskStateSnapshot(), 
is(notNullValue()));
+
+               // schedule the execution vertex and wait for its deployment
+               executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY);
+
+               assertThat(execution.getTaskStateSnapshot(), is(nullValue()));
+       }
+
+       @Nonnull
+       private JobVertex createNoOpJobVertex() {
+               final JobVertex jobVertex = new JobVertex("Test vertex", new 
JobVertexID());
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+
+               return jobVertex;
+       }
+
+       @Nonnull
+       private ProgrammedSlotProvider createProgrammedSlotProvider(
+               int parallelism,
+               Collection<JobVertexID> jobVertexIds,
+               SlotOwner slotOwner) {
+               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+
+               for (JobVertexID jobVertexId : jobVertexIds) {
+                       for (int i = 0; i < parallelism; i++) {
+                               final SimpleSlot slot = new SimpleSlot(
+                                       JOB_ID,
+                                       slotOwner,
+                                       new LocalTaskManagerLocation(),
+                                       0,
+                                       new SimpleAckingTaskManagerGateway(),
+                                       null,
+                                       null);
+
+                               slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+                       }
+               }
+
+               return slotProvider;
+       }
+
+       /**
         * Slot owner which records the first returned slot.
         */
        public static final class TestingSlotOwner implements SlotOwner {

Reply via email to