This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dfd74a2e42bc06bbf85572225b902caf949b2aa3 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu Jun 6 11:10:01 2019 +0200 [FLINK-12683][chkpt] Simplify task manager location info generation for logging statements This closes #8580. --- .../runtime/checkpoint/CheckpointCoordinator.java | 13 +-- .../flink/runtime/scheduler/LegacyScheduler.java | 20 ++-- .../CheckpointCoordinatorFailureTest.java | 5 +- .../CheckpointCoordinatorMasterHooksTest.java | 3 +- .../checkpoint/CheckpointCoordinatorTest.java | 129 +++++++++++---------- .../checkpoint/CheckpointStateRestoreTest.java | 13 ++- ...ncurrentFailoverStrategyExecutionGraphTest.java | 3 +- .../runtime/executiongraph/FailoverRegionTest.java | 3 +- 8 files changed, 92 insertions(+), 97 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0b6a8ad..b6f5a81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -724,15 +723,13 @@ public class CheckpointCoordinator { * Receives a {@link DeclineCheckpoint} message for a pending checkpoint. * * @param message Checkpoint decline from the task manager - * @param taskManagerLocation The location of the decline checkpoint message's sender + * @param taskManagerLocationInfo The location info of the decline checkpoint message's sender */ - public void receiveDeclineMessage(DeclineCheckpoint message, @Nullable TaskManagerLocation taskManagerLocation) { + public void receiveDeclineMessage(DeclineCheckpoint message, String taskManagerLocationInfo) { if (shutdown || message == null) { return; } - final String taskManagerLocationInfo = (taskManagerLocation != null ? taskManagerLocation.toString() : "unknown location"); - if (!job.equals(message.getJob())) { throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + message.getJob() + " from " + taskManagerLocationInfo + " while this coordinator handles job " + job); @@ -785,19 +782,17 @@ public class CheckpointCoordinator { * * @param message Checkpoint ack from the task manager * - * @param taskManagerLocation The location of the acknowledge checkpoint message's sender + * @param taskManagerLocationInfo The location of the acknowledge checkpoint message's sender * @return Flag indicating whether the ack'd checkpoint was associated * with a pending checkpoint. * * @throws CheckpointException If the checkpoint cannot be added to the completed checkpoint store. */ - public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, @Nullable TaskManagerLocation taskManagerLocation) throws CheckpointException { + public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException { if (shutdown || message == null) { return false; } - final String taskManagerLocationInfo = (taskManagerLocation != null ? taskManagerLocation.toString() : "unknown location"); - if (!job.equals(message.getJob())) { LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", job, taskManagerLocationInfo, message); return false; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java index 24682be..2cbe7d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java @@ -528,13 +528,12 @@ public class LegacyScheduler implements SchedulerNG { checkpointMetrics, checkpointState); - final Optional<TaskManagerLocation> optionalLocation = tryRetrieveTaskManagerLocation(executionAttemptID); - final TaskManagerLocation taskManagerLocation = optionalLocation.isPresent() ? optionalLocation.get() : null; + final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID); if (checkpointCoordinator != null) { ioExecutor.execute(() -> { try { - checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocation); + checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo); } catch (Throwable t) { log.warn("Error while processing checkpoint acknowledgement message", t); } @@ -554,13 +553,12 @@ public class LegacyScheduler implements SchedulerNG { mainThreadExecutor.assertRunningInMainThread(); final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - final Optional<TaskManagerLocation> optionalLocation = tryRetrieveTaskManagerLocation(decline.getTaskExecutionId()); - final TaskManagerLocation taskManagerLocation = optionalLocation.isPresent() ? optionalLocation.get() : null; + final String taskManagerLocationInfo = retrieveTaskManagerLocation(decline.getTaskExecutionId()); if (checkpointCoordinator != null) { ioExecutor.execute(() -> { try { - checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocation); + checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo); } catch (Exception e) { log.error("Error in CheckpointCoordinator while processing {}", decline, e); } @@ -631,8 +629,12 @@ public class LegacyScheduler implements SchedulerNG { terminationFuture.thenApply((jobStatus -> path))); } - private Optional<TaskManagerLocation> tryRetrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) { - final Execution currentExecution = executionGraph.getRegisteredExecutions().get(executionAttemptID); - return currentExecution != null ? Optional.ofNullable(currentExecution.getAssignedResourceLocation()) : Optional.empty(); + private String retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) { + final Optional<Execution> currentExecution = Optional.ofNullable(executionGraph.getRegisteredExecutions().get(executionAttemptID)); + + return currentExecution + .map(Execution::getAssignedResourceLocation) + .map(TaskManagerLocation::toString) + .orElse("Unknown location"); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 16f31c6..1709fae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -25,12 +25,11 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -113,7 +112,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState); try { - coord.receiveAcknowledgeMessage(acknowledgeMessage, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeMessage, "Unknown location"); fail("Expected a checkpoint exception because the completed checkpoint store could not " + "store the completed checkpoint."); } catch (CheckpointException e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index 5c9b857..762b805 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -203,7 +202,7 @@ public class CheckpointCoordinatorMasterHooksTest { verify(statelessHook, times(1)).triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)); final long checkpointId = cc.getPendingCheckpoints().values().iterator().next().getCheckpointId(); - cc.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, execId, checkpointId), new LocalTaskManagerLocation()); + cc.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, execId, checkpointId), "Unknown location"); assertEquals(0, cc.getNumberOfPendingCheckpoints()); assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 8d9c6a8..1676b01 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -51,7 +51,6 @@ import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore; import org.apache.flink.util.InstantiationUtil; @@ -116,6 +115,8 @@ import static org.mockito.Mockito.when; */ public class CheckpointCoordinatorTest extends TestLogger { + private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location"; + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -353,21 +354,21 @@ public class CheckpointCoordinatorTest extends TestLogger { verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation()); // acknowledge from one of the tasks - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location"); assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks()); assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks()); assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); // acknowledge the same task again (should not matter) - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location"); assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); // decline checkpoint from the other task, this should cancel the checkpoint // and trigger a new one - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), new LocalTaskManagerLocation()); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO); assertTrue(checkpoint.isDiscarded()); // the canceler is also removed @@ -379,8 +380,8 @@ public class CheckpointCoordinatorTest extends TestLogger { // decline again, nothing should happen // decline from the other task, nothing should happen - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), new LocalTaskManagerLocation()); - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId), new LocalTaskManagerLocation()); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO); assertTrue(checkpoint.isDiscarded()); coord.shutdown(JobStatus.FINISHED); @@ -480,7 +481,7 @@ public class CheckpointCoordinatorTest extends TestLogger { } // decline checkpoint from one of the tasks, this should cancel the checkpoint - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), new LocalTaskManagerLocation()); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO); assertTrue(checkpoint1.isDiscarded()); // validate that we have only one pending checkpoint left @@ -505,8 +506,8 @@ public class CheckpointCoordinatorTest extends TestLogger { // decline again, nothing should happen // decline from the other task, nothing should happen - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), new LocalTaskManagerLocation()); - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id), new LocalTaskManagerLocation()); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO); assertTrue(checkpoint1.isDiscarded()); coord.shutdown(JobStatus.FINISHED); @@ -589,7 +590,7 @@ public class CheckpointCoordinatorTest extends TestLogger { // acknowledge from one of the tasks AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO); assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks()); assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks()); assertFalse(checkpoint.isDiscarded()); @@ -597,13 +598,13 @@ public class CheckpointCoordinatorTest extends TestLogger { verify(taskOperatorSubtaskStates2, never()).registerSharedStates(any(SharedStateRegistry.class)); // acknowledge the same task again (should not matter) - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO); assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); verify(subtaskState2, never()).registerSharedStates(any(SharedStateRegistry.class)); // acknowledge the other task. - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO); // the checkpoint is internally converted to a successful checkpoint and the // pending checkpoint object is disposed @@ -641,8 +642,8 @@ public class CheckpointCoordinatorTest extends TestLogger { coord.triggerCheckpoint(timestampNew, false); long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -733,7 +734,7 @@ public class CheckpointCoordinatorTest extends TestLogger { verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class)); // acknowledge one of the three tasks - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO); // start the second checkpoint // trigger the first checkpoint. this should succeed @@ -757,10 +758,10 @@ public class CheckpointCoordinatorTest extends TestLogger { // we acknowledge the remaining two tasks from the first // checkpoint and two tasks from the second checkpoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO); // now, the first checkpoint should be confirmed assertEquals(1, coord.getNumberOfPendingCheckpoints()); @@ -771,7 +772,7 @@ public class CheckpointCoordinatorTest extends TestLogger { verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId1), eq(timestamp1)); // send the last remaining ack for the second checkpoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO); // now, the second checkpoint should be confirmed assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -880,7 +881,7 @@ public class CheckpointCoordinatorTest extends TestLogger { taskOperatorSubtaskStates1_3.putSubtaskStateByOperatorID(opID3, subtaskState1_3); // acknowledge one of the three tasks - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates1_2), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates1_2), TASK_MANAGER_LOCATION_INFO); // start the second checkpoint // trigger the first checkpoint. this should succeed @@ -917,13 +918,13 @@ public class CheckpointCoordinatorTest extends TestLogger { // we acknowledge one more task from the first checkpoint and the second // checkpoint completely. The second checkpoint should then subsume the first checkpoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates2_3), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates2_3), TASK_MANAGER_LOCATION_INFO); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates2_1), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates2_1), TASK_MANAGER_LOCATION_INFO); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates1_1), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates1_1), TASK_MANAGER_LOCATION_INFO); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates2_2), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates2_2), TASK_MANAGER_LOCATION_INFO); // now, the second checkpoint should be confirmed, and the first discarded // actually both pending checkpoints are discarded, and the second has been transformed @@ -955,7 +956,7 @@ public class CheckpointCoordinatorTest extends TestLogger { verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), eq(timestamp2)); // send the last remaining ack for the first checkpoint. This should not do anything - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates1_3), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates1_3), TASK_MANAGER_LOCATION_INFO); verify(subtaskState1_3, times(1)).discardState(); coord.shutdown(JobStatus.FINISHED); @@ -1027,7 +1028,7 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorSubtaskState subtaskState1 = mock(OperatorSubtaskState.class); taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO); // wait until the checkpoint must have expired. // we check every 250 msecs conservatively for 5 seconds @@ -1102,13 +1103,13 @@ public class CheckpointCoordinatorTest extends TestLogger { // non of the messages should throw an exception // wrong job id - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO); // unknown checkpoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L), TASK_MANAGER_LOCATION_INFO); // unknown ack vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId), TASK_MANAGER_LOCATION_INFO); coord.shutdown(JobStatus.FINISHED); } @@ -1173,7 +1174,7 @@ public class CheckpointCoordinatorTest extends TestLogger { taskOperatorSubtaskStatesTrigger.putSubtaskStateByOperatorID(opIDtrigger, subtaskStateTrigger); // acknowledge the first trigger vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO); // verify that the subtask state has not been discarded verify(subtaskStateTrigger, never()).discardState(); @@ -1181,7 +1182,7 @@ public class CheckpointCoordinatorTest extends TestLogger { TaskStateSnapshot unknownSubtaskState = mock(TaskStateSnapshot.class); // receive an acknowledge message for an unknown vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO); // we should discard acknowledge messages from an unknown vertex belonging to our job verify(unknownSubtaskState, times(1)).discardState(); @@ -1189,21 +1190,21 @@ public class CheckpointCoordinatorTest extends TestLogger { TaskStateSnapshot differentJobSubtaskState = mock(TaskStateSnapshot.class); // receive an acknowledge message from an unknown job - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO); // we should not interfere with different jobs verify(differentJobSubtaskState, never()).discardState(); // duplicate acknowledge message for the trigger vertex TaskStateSnapshot triggerSubtaskState = mock(TaskStateSnapshot.class); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO); // duplicate acknowledge messages for a known vertex should not trigger discarding the state verify(triggerSubtaskState, never()).discardState(); // let the checkpoint fail at the first ack vertex reset(subtaskStateTrigger); - coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId), new LocalTaskManagerLocation()); + coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId), TASK_MANAGER_LOCATION_INFO); assertTrue(pendingCheckpoint.isDiscarded()); @@ -1213,14 +1214,14 @@ public class CheckpointCoordinatorTest extends TestLogger { TaskStateSnapshot ackSubtaskState = mock(TaskStateSnapshot.class); // late acknowledge message from the second ack vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO); // check that we also cleaned up this state verify(ackSubtaskState, times(1)).discardState(); // receive an acknowledge message from an unknown job reset(differentJobSubtaskState); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO); // we should not interfere with different jobs verify(differentJobSubtaskState, never()).discardState(); @@ -1228,7 +1229,7 @@ public class CheckpointCoordinatorTest extends TestLogger { TaskStateSnapshot unknownSubtaskState2 = mock(TaskStateSnapshot.class); // receive an acknowledge message for an unknown vertex - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO); // we should discard acknowledge messages from an unknown vertex belonging to our job verify(unknownSubtaskState2, times(1)).discardState(); @@ -1394,7 +1395,7 @@ public class CheckpointCoordinatorTest extends TestLogger { // tell the coordinator that the checkpoint is done final long ackTime = System.nanoTime(); - coord.receiveAcknowledgeMessage(ackMsg, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(ackMsg, TASK_MANAGER_LOCATION_INFO); // wait until the next checkpoint is triggered Long nextCallId = triggerCalls.take(); @@ -1494,7 +1495,7 @@ public class CheckpointCoordinatorTest extends TestLogger { // acknowledge from one of the tasks AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO); assertEquals(1, pending.getNumberOfAcknowledgedTasks()); assertEquals(1, pending.getNumberOfNonAcknowledgedTasks()); assertFalse(pending.isDiscarded()); @@ -1502,13 +1503,13 @@ public class CheckpointCoordinatorTest extends TestLogger { assertFalse(savepointFuture.isDone()); // acknowledge the same task again (should not matter) - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO); assertFalse(pending.isDiscarded()); assertFalse(pending.isFullyAcknowledged()); assertFalse(savepointFuture.isDone()); // acknowledge the other task. - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO); // the checkpoint is internally converted to a successful checkpoint and the // pending checkpoint object is disposed @@ -1545,8 +1546,8 @@ public class CheckpointCoordinatorTest extends TestLogger { assertFalse(savepointFuture.isDone()); long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1626,8 +1627,8 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(3, coord.getNumberOfPendingCheckpoints()); // 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO); assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1643,8 +1644,8 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(3, coord.getNumberOfPendingCheckpoints()); // 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO); assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1654,8 +1655,8 @@ public class CheckpointCoordinatorTest extends TestLogger { assertTrue(savepointFuture2.isDone()); // Ack first savepoint - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1725,7 +1726,7 @@ public class CheckpointCoordinatorTest extends TestLogger { .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class)); // now, once we acknowledge one checkpoint, it should trigger the next one - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L), TASK_MANAGER_LOCATION_INFO); // this should have immediately triggered a new checkpoint now = System.currentTimeMillis(); @@ -1802,7 +1803,7 @@ public class CheckpointCoordinatorTest extends TestLogger { // now we acknowledge the second checkpoint, which should subsume the first checkpoint // and allow two more checkpoints to be triggered // now, once we acknowledge one checkpoint, it should trigger the next one - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L), TASK_MANAGER_LOCATION_INFO); // after a while, there should be the new checkpoints final long newTimeout = System.currentTimeMillis() + 60000; @@ -1932,7 +1933,7 @@ public class CheckpointCoordinatorTest extends TestLogger { // ACK all savepoints long checkpointId = checkpointIDCounter.getLast(); for (int i = 0; i < numSavepoints; i++, checkpointId--) { - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO); } // After ACKs, all should be completed @@ -2051,7 +2052,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new CheckpointMetrics(), subtaskState); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); } for (int index = 0; index < jobVertex2.getParallelism(); index++) { @@ -2064,7 +2065,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new CheckpointMetrics(), subtaskState); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); } List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints(); @@ -2169,7 +2170,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new CheckpointMetrics(), taskOperatorSubtaskStates); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); } @@ -2185,7 +2186,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new CheckpointMetrics(), taskOperatorSubtaskStates); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); } List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints(); @@ -2297,8 +2298,8 @@ public class CheckpointCoordinatorTest extends TestLogger { StateObjectCollection.singleton(serializedKeyGroupStates), StateObjectCollection.empty())); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForCheckpoint), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForCheckpoint), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO); CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0); assertEquals(jid, success.getJobId()); @@ -2324,8 +2325,8 @@ public class CheckpointCoordinatorTest extends TestLogger { StateObjectCollection.empty())); checkpointId = checkpointIDCounter.getLast(); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForSavepoint), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForSavepoint), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO); assertTrue(savepointFuture.isDone()); @@ -2466,7 +2467,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new CheckpointMetrics(), taskOperatorSubtaskStates); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); } //vertex 2 @@ -2491,7 +2492,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new CheckpointMetrics(), taskOperatorSubtaskStates); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); } List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints(); @@ -3830,7 +3831,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new CheckpointMetrics(), taskStateSnapshot); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 273d56c..8f4f607 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.util.SerializableObject; import org.hamcrest.BaseMatcher; @@ -60,6 +59,8 @@ import static org.mockito.Mockito.when; */ public class CheckpointStateRestoreTest { + private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location"; + /** * Tests that on restore the task state is reset for each stateful task. */ @@ -130,11 +131,11 @@ public class CheckpointStateRestoreTest { StateObjectCollection.singleton(serializedKeyGroupStates), StateObjectCollection.empty())); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), new LocalTaskManagerLocation()); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId), new LocalTaskManagerLocation()); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO); assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java index 411d9ce..9675631 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java @@ -49,7 +49,6 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.TestLogger; @@ -448,7 +447,7 @@ public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger { graph.getJobID(), vertex1.getCurrentExecutionAttempt().getAttemptId(), checkpointToAcknowledge), - new LocalTaskManagerLocation()); + "Unknown location"); Map<Long, PendingCheckpoint> oldPendingCheckpoints = new HashMap<>(3); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java index b0f5cf5..c953b5d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java @@ -63,7 +63,6 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -686,7 +685,7 @@ public class FailoverRegionTest extends TestLogger { new CheckpointMetrics(), taskOperatorSubtaskStates); - checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, new LocalTaskManagerLocation()); + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location"); } } }