Repository: flink Updated Branches: refs/heads/release-1.4 1a852ecfe -> 27a39de0f
[FLINK-9693] Set Execution#taskState to null after deployment Setting the assigned Execution#taskState to null after the deployment allows the TaskStateSnapshot instance to be garbage collected. Furthermore, it won't be archived along with the Execution in the ExecutionVertex in case of a restart. This is especially important when setting state.backend.fs.memory-threshold to larger values because every state below this threshold will be stored in the meta state files and, thus, also the TaskStateSnapshot instances. This closes #6251. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27a39de0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27a39de0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27a39de0 Branch: refs/heads/release-1.4 Commit: 27a39de0f2c47068f3e3aaf2574a0bd6a757e08d Parents: 1a852ec Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed Jul 4 11:05:25 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Jul 12 14:06:58 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 3 + .../runtime/executiongraph/ExecutionTest.java | 151 +++++++++++++++++-- 2 files changed, 145 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/27a39de0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 38c3821..48b9f8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -514,6 +514,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution taskState, attemptNumber); + // null taskState to let it be GC'ed + taskState = null; + final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/27a39de0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index fa845cf..e616628 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; @@ -31,17 +32,24 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.TestLogger; import org.junit.Test; +import javax.annotation.Nonnull; + import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -52,15 +60,16 @@ import static org.junit.Assert.assertTrue; */ public class ExecutionTest extends TestLogger { + public static final JobID JOB_ID = new JobID(); + /** * Tests that slots are released if we cannot assign the allocated resource to the * Execution. In this case, a concurrent cancellation precedes the assignment. */ @Test public void testSlotReleaseOnFailedResourceAssignment() throws Exception { - final JobVertexID jobVertexId = new JobVertexID(); - final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); - jobVertex.setInvokableClass(NoOpInvokable.class); + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); final CompletableFuture<SimpleSlot> slotFuture = new CompletableFuture<>(); final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); @@ -112,9 +121,8 @@ public class ExecutionTest extends TestLogger { */ @Test public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception { - final JobVertexID jobVertexId = new JobVertexID(); - final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); - jobVertex.setInvokableClass(NoOpInvokable.class); + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); final TestingSlotOwner slotOwner = new TestingSlotOwner(); @@ -162,9 +170,8 @@ public class ExecutionTest extends TestLogger { */ @Test public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception { - final JobVertexID jobVertexId = new JobVertexID(); - final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); - jobVertex.setInvokableClass(NoOpInvokable.class); + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); final TestingSlotOwner slotOwner = new TestingSlotOwner(); @@ -268,6 +275,132 @@ public class ExecutionTest extends TestLogger { } /** + * Checks that the {@link Execution} termination future is only completed after the + * assigned slot has been released. + * + * <p>NOTE: This test only fails spuriously without the fix of this commit. Thus, one has + * to execute this test multiple times to see the failure. + */ + @Test + public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception { + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider( + 1, + Collections.singleton(jobVertexId), + slotOwner); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY); + + Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt(); + + CompletableFuture<Slot> returnedSlotFuture = slotOwner.getReturnedSlotFuture(); + CompletableFuture<?> terminationFuture = executionVertex.cancel(); + + // run canceling in a separate thread to allow an interleaving between termination + // future callback registrations + CompletableFuture.runAsync( + () -> currentExecutionAttempt.cancelingComplete(), + TestingUtils.defaultExecutor()); + + // to increase probability for problematic interleaving, let the current thread yield the processor + Thread.yield(); + + CompletableFuture<Boolean> restartFuture = terminationFuture.thenApply( + ignored -> { + assertTrue(returnedSlotFuture.isDone()); + return true; + }); + + + // check if the returned slot future was completed first + restartFuture.get(); + } + + /** + * Tests that the task restore state is nulled after the {@link Execution} has been + * deployed. See FLINK-9693. + */ + @Test + public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception { + final JobVertex jobVertex = createNoOpJobVertex(); + final JobVertexID jobVertexId = jobVertex.getID(); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider( + 1, + Collections.singleton(jobVertexId), + slotOwner); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + final Execution execution = executionVertex.getCurrentExecutionAttempt(); + + final TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(); + execution.setInitialState(taskStateSnapshot); + + assertThat(execution.getTaskStateSnapshot(), is(notNullValue())); + + // schedule the execution vertex and wait for its deployment + executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY); + + assertThat(execution.getTaskStateSnapshot(), is(nullValue())); + } + + @Nonnull + private JobVertex createNoOpJobVertex() { + final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID()); + jobVertex.setInvokableClass(NoOpInvokable.class); + + return jobVertex; + } + + @Nonnull + private ProgrammedSlotProvider createProgrammedSlotProvider( + int parallelism, + Collection<JobVertexID> jobVertexIds, + SlotOwner slotOwner) { + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); + + for (JobVertexID jobVertexId : jobVertexIds) { + for (int i = 0; i < parallelism; i++) { + final SimpleSlot slot = new SimpleSlot( + JOB_ID, + slotOwner, + new LocalTaskManagerLocation(), + 0, + new SimpleAckingTaskManagerGateway(), + null, + null); + + slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); + } + } + + return slotProvider; + } + + /** * Slot owner which records the first returned slot. */ public static final class TestingSlotOwner implements SlotOwner {