This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dfd74a2e42bc06bbf85572225b902caf949b2aa3
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Jun 6 11:10:01 2019 +0200

    [FLINK-12683][chkpt] Simplify task manager location info generation for 
logging statements
    
    This closes #8580.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  13 +--
 .../flink/runtime/scheduler/LegacyScheduler.java   |  20 ++--
 .../CheckpointCoordinatorFailureTest.java          |   5 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |   3 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 129 +++++++++++----------
 .../checkpoint/CheckpointStateRestoreTest.java     |  13 ++-
 ...ncurrentFailoverStrategyExecutionGraphTest.java |   3 +-
 .../runtime/executiongraph/FailoverRegionTest.java |   3 +-
 8 files changed, 92 insertions(+), 97 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0b6a8ad..b6f5a81 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -724,15 +723,13 @@ public class CheckpointCoordinator {
         * Receives a {@link DeclineCheckpoint} message for a pending 
checkpoint.
         *
         * @param message Checkpoint decline from the task manager
-        * @param taskManagerLocation The location of the decline checkpoint 
message's sender
+        * @param taskManagerLocationInfo The location info of the decline 
checkpoint message's sender
         */
-       public void receiveDeclineMessage(DeclineCheckpoint message, @Nullable 
TaskManagerLocation taskManagerLocation) {
+       public void receiveDeclineMessage(DeclineCheckpoint message, String 
taskManagerLocationInfo) {
                if (shutdown || message == null) {
                        return;
                }
 
-               final String taskManagerLocationInfo = (taskManagerLocation != 
null ? taskManagerLocation.toString() : "unknown location");
-
                if (!job.equals(message.getJob())) {
                        throw new IllegalArgumentException("Received 
DeclineCheckpoint message for job " +
                                message.getJob() + " from " + 
taskManagerLocationInfo + " while this coordinator handles job " + job);
@@ -785,19 +782,17 @@ public class CheckpointCoordinator {
         *
         * @param message Checkpoint ack from the task manager
         *
-        * @param taskManagerLocation The location of the acknowledge 
checkpoint message's sender
+        * @param taskManagerLocationInfo The location of the acknowledge 
checkpoint message's sender
         * @return Flag indicating whether the ack'd checkpoint was associated
         * with a pending checkpoint.
         *
         * @throws CheckpointException If the checkpoint cannot be added to the 
completed checkpoint store.
         */
-       public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, 
@Nullable TaskManagerLocation taskManagerLocation) throws CheckpointException {
+       public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, 
String taskManagerLocationInfo) throws CheckpointException {
                if (shutdown || message == null) {
                        return false;
                }
 
-               final String taskManagerLocationInfo = (taskManagerLocation != 
null ? taskManagerLocation.toString() : "unknown location");
-
                if (!job.equals(message.getJob())) {
                        LOG.error("Received wrong AcknowledgeCheckpoint message 
for job {} from {} : {}", job, taskManagerLocationInfo, message);
                        return false;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index 24682be..2cbe7d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -528,13 +528,12 @@ public class LegacyScheduler implements SchedulerNG {
                        checkpointMetrics,
                        checkpointState);
 
-               final Optional<TaskManagerLocation> optionalLocation = 
tryRetrieveTaskManagerLocation(executionAttemptID);
-               final TaskManagerLocation taskManagerLocation = 
optionalLocation.isPresent() ? optionalLocation.get() : null;
+               final String taskManagerLocationInfo = 
retrieveTaskManagerLocation(executionAttemptID);
 
                if (checkpointCoordinator != null) {
                        ioExecutor.execute(() -> {
                                try {
-                                       
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, 
taskManagerLocation);
+                                       
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, 
taskManagerLocationInfo);
                                } catch (Throwable t) {
                                        log.warn("Error while processing 
checkpoint acknowledgement message", t);
                                }
@@ -554,13 +553,12 @@ public class LegacyScheduler implements SchedulerNG {
                mainThreadExecutor.assertRunningInMainThread();
 
                final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-               final Optional<TaskManagerLocation> optionalLocation = 
tryRetrieveTaskManagerLocation(decline.getTaskExecutionId());
-               final TaskManagerLocation taskManagerLocation = 
optionalLocation.isPresent() ? optionalLocation.get() : null;
+               final String taskManagerLocationInfo = 
retrieveTaskManagerLocation(decline.getTaskExecutionId());
 
                if (checkpointCoordinator != null) {
                        ioExecutor.execute(() -> {
                                try {
-                                       
checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocation);
+                                       
checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo);
                                } catch (Exception e) {
                                        log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
                                }
@@ -631,8 +629,12 @@ public class LegacyScheduler implements SchedulerNG {
                        terminationFuture.thenApply((jobStatus -> path)));
        }
 
-       private Optional<TaskManagerLocation> 
tryRetrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) {
-               final Execution currentExecution = 
executionGraph.getRegisteredExecutions().get(executionAttemptID);
-               return currentExecution != null ? 
Optional.ofNullable(currentExecution.getAssignedResourceLocation()) : 
Optional.empty();
+       private String retrieveTaskManagerLocation(ExecutionAttemptID 
executionAttemptID) {
+               final Optional<Execution> currentExecution = 
Optional.ofNullable(executionGraph.getRegisteredExecutions().get(executionAttemptID));
+
+               return currentExecution
+                       .map(Execution::getAssignedResourceLocation)
+                       .map(TaskManagerLocation::toString)
+                       .orElse("Unknown location");
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 16f31c6..1709fae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -25,12 +25,11 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -113,7 +112,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                AcknowledgeCheckpoint acknowledgeMessage = new 
AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new 
CheckpointMetrics(), subtaskState);
 
                try {
-                       coord.receiveAcknowledgeMessage(acknowledgeMessage, new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeMessage, 
"Unknown location");
                        fail("Expected a checkpoint exception because the 
completed checkpoint store could not " +
                                "store the completed checkpoint.");
                } catch (CheckpointException e) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 5c9b857..762b805 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -203,7 +202,7 @@ public class CheckpointCoordinatorMasterHooksTest {
                verify(statelessHook, times(1)).triggerCheckpoint(anyLong(), 
anyLong(), any(Executor.class));
 
                final long checkpointId = 
cc.getPendingCheckpoints().values().iterator().next().getCheckpointId();
-               cc.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
execId, checkpointId), new LocalTaskManagerLocation());
+               cc.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
execId, checkpointId), "Unknown location");
                assertEquals(0, cc.getNumberOfPendingCheckpoints());
 
                assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 8d9c6a8..1676b01 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -51,7 +51,6 @@ import 
org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.util.InstantiationUtil;
@@ -116,6 +115,8 @@ import static org.mockito.Mockito.when;
  */
 public class CheckpointCoordinatorTest extends TestLogger {
 
+       private static final String TASK_MANAGER_LOCATION_INFO = "Unknown 
location";
+
        @Rule
        public TemporaryFolder tmpFolder = new TemporaryFolder();
 
@@ -353,21 +354,21 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
 
                        // acknowledge from one of the tasks
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location");
                        assertEquals(1, 
checkpoint.getNumberOfAcknowledgedTasks());
                        assertEquals(1, 
checkpoint.getNumberOfNonAcknowledgedTasks());
                        assertFalse(checkpoint.isDiscarded());
                        assertFalse(checkpoint.isFullyAcknowledged());
 
                        // acknowledge the same task again (should not matter)
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId), "Unknown location");
                        assertFalse(checkpoint.isDiscarded());
                        assertFalse(checkpoint.isFullyAcknowledged());
 
 
                        // decline checkpoint from the other task, this should 
cancel the checkpoint
                        // and trigger a new one
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId), new LocalTaskManagerLocation());
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
                        assertTrue(checkpoint.isDiscarded());
 
                        // the canceler is also removed
@@ -379,8 +380,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
                        // decline again, nothing should happen
                        // decline from the other task, nothing should happen
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId), new LocalTaskManagerLocation());
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpointId), new LocalTaskManagerLocation());
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
                        assertTrue(checkpoint.isDiscarded());
 
                        coord.shutdown(JobStatus.FINISHED);
@@ -480,7 +481,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                        }
 
                        // decline checkpoint from one of the tasks, this 
should cancel the checkpoint
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id), new LocalTaskManagerLocation());
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
                        assertTrue(checkpoint1.isDiscarded());
 
                        // validate that we have only one pending checkpoint 
left
@@ -505,8 +506,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
                        // decline again, nothing should happen
                        // decline from the other task, nothing should happen
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id), new LocalTaskManagerLocation());
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpoint1Id), new LocalTaskManagerLocation());
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
                        assertTrue(checkpoint1.isDiscarded());
 
                        coord.shutdown(JobStatus.FINISHED);
@@ -589,7 +590,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
                        // acknowledge from one of the tasks
                        AcknowledgeCheckpoint acknowledgeCheckpoint1 = new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), 
taskOperatorSubtaskStates2);
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, 
new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, 
TASK_MANAGER_LOCATION_INFO);
                        assertEquals(1, 
checkpoint.getNumberOfAcknowledgedTasks());
                        assertEquals(1, 
checkpoint.getNumberOfNonAcknowledgedTasks());
                        assertFalse(checkpoint.isDiscarded());
@@ -597,13 +598,13 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        verify(taskOperatorSubtaskStates2, 
never()).registerSharedStates(any(SharedStateRegistry.class));
 
                        // acknowledge the same task again (should not matter)
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, 
new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, 
TASK_MANAGER_LOCATION_INFO);
                        assertFalse(checkpoint.isDiscarded());
                        assertFalse(checkpoint.isFullyAcknowledged());
                        verify(subtaskState2, 
never()).registerSharedStates(any(SharedStateRegistry.class));
 
                        // acknowledge the other task.
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), 
taskOperatorSubtaskStates1), new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), 
taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
 
                        // the checkpoint is internally converted to a 
successful checkpoint and the
                        // pending checkpoint object is disposed
@@ -641,8 +642,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                        coord.triggerCheckpoint(timestampNew, false);
 
                        long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), new 
LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew), 
TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew), 
TASK_MANAGER_LOCATION_INFO);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -733,7 +734,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                        verify(triggerVertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), 
any(CheckpointOptions.class));
 
                        // acknowledge one of the three tasks
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1), 
TASK_MANAGER_LOCATION_INFO);
 
                        // start the second checkpoint
                        // trigger the first checkpoint. this should succeed
@@ -757,10 +758,10 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                        // we acknowledge the remaining two tasks from the first
                        // checkpoint and two tasks from the second checkpoint
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1), new 
LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2), new 
LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1), new 
LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1), 
TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2), 
TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1), 
TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2), 
TASK_MANAGER_LOCATION_INFO);
 
                        // now, the first checkpoint should be confirmed
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -771,7 +772,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                        verify(commitVertex.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointId1), eq(timestamp1));
 
                        // send the last remaining ack for the second checkpoint
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2), 
TASK_MANAGER_LOCATION_INFO);
 
                        // now, the second checkpoint should be confirmed
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -880,7 +881,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                        
taskOperatorSubtaskStates1_3.putSubtaskStateByOperatorID(opID3, 
subtaskState1_3);
 
                        // acknowledge one of the three tasks
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new 
CheckpointMetrics(), taskOperatorSubtaskStates1_2), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new 
CheckpointMetrics(), taskOperatorSubtaskStates1_2), TASK_MANAGER_LOCATION_INFO);
 
                        // start the second checkpoint
                        // trigger the first checkpoint. this should succeed
@@ -917,13 +918,13 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        // we acknowledge one more task from the first 
checkpoint and the second
                        // checkpoint completely. The second checkpoint should 
then subsume the first checkpoint
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new 
CheckpointMetrics(), taskOperatorSubtaskStates2_3), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new 
CheckpointMetrics(), taskOperatorSubtaskStates2_3), TASK_MANAGER_LOCATION_INFO);
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new 
CheckpointMetrics(), taskOperatorSubtaskStates2_1), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new 
CheckpointMetrics(), taskOperatorSubtaskStates2_1), TASK_MANAGER_LOCATION_INFO);
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new 
CheckpointMetrics(), taskOperatorSubtaskStates1_1), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new 
CheckpointMetrics(), taskOperatorSubtaskStates1_1), TASK_MANAGER_LOCATION_INFO);
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new 
CheckpointMetrics(), taskOperatorSubtaskStates2_2), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new 
CheckpointMetrics(), taskOperatorSubtaskStates2_2), TASK_MANAGER_LOCATION_INFO);
 
                        // now, the second checkpoint should be confirmed, and 
the first discarded
                        // actually both pending checkpoints are discarded, and 
the second has been transformed
@@ -955,7 +956,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                        verify(commitVertex.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointId2), eq(timestamp2));
 
                        // send the last remaining ack for the first 
checkpoint. This should not do anything
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1, new 
CheckpointMetrics(), taskOperatorSubtaskStates1_3), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1, new 
CheckpointMetrics(), taskOperatorSubtaskStates1_3), TASK_MANAGER_LOCATION_INFO);
                        verify(subtaskState1_3, times(1)).discardState();
 
                        coord.shutdown(JobStatus.FINISHED);
@@ -1027,7 +1028,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        OperatorSubtaskState subtaskState1 = 
mock(OperatorSubtaskState.class);
                        
taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new 
CheckpointMetrics(), taskOperatorSubtaskStates1), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new 
CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
 
                        // wait until the checkpoint must have expired.
                        // we check every 250 msecs conservatively for 5 seconds
@@ -1102,13 +1103,13 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        // non of the messages should throw an exception
 
                        // wrong job id
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), 
TASK_MANAGER_LOCATION_INFO);
 
                        // unknown checkpoint
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, 1L), new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
 
                        // unknown ack vertex
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId), 
TASK_MANAGER_LOCATION_INFO);
 
                        coord.shutdown(JobStatus.FINISHED);
                }
@@ -1173,7 +1174,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                
taskOperatorSubtaskStatesTrigger.putSubtaskStateByOperatorID(opIDtrigger, 
subtaskStateTrigger);
 
                // acknowledge the first trigger vertex
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new 
CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), new 
LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new 
CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), 
TASK_MANAGER_LOCATION_INFO);
 
                // verify that the subtask state has not been discarded
                verify(subtaskStateTrigger, never()).discardState();
@@ -1181,7 +1182,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                TaskStateSnapshot unknownSubtaskState = 
mock(TaskStateSnapshot.class);
 
                // receive an acknowledge message for an unknown vertex
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new 
CheckpointMetrics(), unknownSubtaskState), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new 
CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
                // we should discard acknowledge messages from an unknown 
vertex belonging to our job
                verify(unknownSubtaskState, times(1)).discardState();
@@ -1189,21 +1190,21 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                TaskStateSnapshot differentJobSubtaskState = 
mock(TaskStateSnapshot.class);
 
                // receive an acknowledge message from an unknown job
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new 
JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), 
differentJobSubtaskState), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new 
JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), 
differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
                // we should not interfere with different jobs
                verify(differentJobSubtaskState, never()).discardState();
 
                // duplicate acknowledge message for the trigger vertex
                TaskStateSnapshot triggerSubtaskState = 
mock(TaskStateSnapshot.class);
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new 
CheckpointMetrics(), triggerSubtaskState), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new 
CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
                // duplicate acknowledge messages for a known vertex should not 
trigger discarding the state
                verify(triggerSubtaskState, never()).discardState();
 
                // let the checkpoint fail at the first ack vertex
                reset(subtaskStateTrigger);
-               coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, 
ackAttemptId1, checkpointId), new LocalTaskManagerLocation());
+               coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, 
ackAttemptId1, checkpointId), TASK_MANAGER_LOCATION_INFO);
 
                assertTrue(pendingCheckpoint.isDiscarded());
 
@@ -1213,14 +1214,14 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                TaskStateSnapshot ackSubtaskState = 
mock(TaskStateSnapshot.class);
 
                // late acknowledge message from the second ack vertex
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new 
CheckpointMetrics(), ackSubtaskState), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new 
CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
                // check that we also cleaned up this state
                verify(ackSubtaskState, times(1)).discardState();
 
                // receive an acknowledge message from an unknown job
                reset(differentJobSubtaskState);
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new 
JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), 
differentJobSubtaskState), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new 
JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), 
differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
 
                // we should not interfere with different jobs
                verify(differentJobSubtaskState, never()).discardState();
@@ -1228,7 +1229,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                TaskStateSnapshot unknownSubtaskState2 = 
mock(TaskStateSnapshot.class);
 
                // receive an acknowledge message for an unknown vertex
-               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new 
CheckpointMetrics(), unknownSubtaskState2), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new 
CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
 
                // we should discard acknowledge messages from an unknown 
vertex belonging to our job
                verify(unknownSubtaskState2, times(1)).discardState();
@@ -1394,7 +1395,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                        // tell the coordinator that the checkpoint is done
                        final long ackTime = System.nanoTime();
-                       coord.receiveAcknowledgeMessage(ackMsg, new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(ackMsg, 
TASK_MANAGER_LOCATION_INFO);
 
                        // wait until the next checkpoint is triggered
                        Long nextCallId = triggerCalls.take();
@@ -1494,7 +1495,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                // acknowledge from one of the tasks
                AcknowledgeCheckpoint acknowledgeCheckpoint2 = new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), 
taskOperatorSubtaskStates2);
-               coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, new 
LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, 
TASK_MANAGER_LOCATION_INFO);
                assertEquals(1, pending.getNumberOfAcknowledgedTasks());
                assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
                assertFalse(pending.isDiscarded());
@@ -1502,13 +1503,13 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                assertFalse(savepointFuture.isDone());
 
                // acknowledge the same task again (should not matter)
-               coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, new 
LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, 
TASK_MANAGER_LOCATION_INFO);
                assertFalse(pending.isDiscarded());
                assertFalse(pending.isFullyAcknowledged());
                assertFalse(savepointFuture.isDone());
 
                // acknowledge the other task.
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), 
new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), 
TASK_MANAGER_LOCATION_INFO);
 
                // the checkpoint is internally converted to a successful 
checkpoint and the
                // pending checkpoint object is disposed
@@ -1545,8 +1546,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                assertFalse(savepointFuture.isDone());
 
                long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointIdNew), new LocalTaskManagerLocation());
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointIdNew), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1626,8 +1627,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
                // 2nd checkpoint should subsume the 1st checkpoint, but not 
the savepoint
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId2), new LocalTaskManagerLocation());
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId2), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
 
                assertEquals(1, coord.getNumberOfPendingCheckpoints());
                assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1643,8 +1644,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
                // 2nd savepoint should subsume the last checkpoint, but not 
the 1st savepoint
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId2), new LocalTaskManagerLocation());
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId2), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
 
                assertEquals(1, coord.getNumberOfPendingCheckpoints());
                assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1654,8 +1655,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                assertTrue(savepointFuture2.isDone());
 
                // Ack first savepoint
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId1), new LocalTaskManagerLocation());
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId1), new LocalTaskManagerLocation());
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(3, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1725,7 +1726,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        .triggerCheckpoint(anyLong(), 
anyLong(), any(CheckpointOptions.class));
 
                        // now, once we acknowledge one checkpoint, it should 
trigger the next one
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 1L), new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 1L), TASK_MANAGER_LOCATION_INFO);
 
                        // this should have immediately triggered a new 
checkpoint
                        now = System.currentTimeMillis();
@@ -1802,7 +1803,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        // now we acknowledge the second checkpoint, which 
should subsume the first checkpoint
                        // and allow two more checkpoints to be triggered
                        // now, once we acknowledge one checkpoint, it should 
trigger the next one
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 2L), new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 2L), TASK_MANAGER_LOCATION_INFO);
 
                        // after a while, there should be the new checkpoints
                        final long newTimeout = System.currentTimeMillis() + 
60000;
@@ -1932,7 +1933,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                // ACK all savepoints
                long checkpointId = checkpointIDCounter.getLast();
                for (int i = 0; i < numSavepoints; i++, checkpointId--) {
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), 
TASK_MANAGER_LOCATION_INFO);
                }
 
                // After ACKs, all should be completed
@@ -2051,7 +2052,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        new CheckpointMetrics(),
                                        subtaskState);
 
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
TASK_MANAGER_LOCATION_INFO);
                }
 
                for (int index = 0; index < jobVertex2.getParallelism(); 
index++) {
@@ -2064,7 +2065,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        new CheckpointMetrics(),
                                        subtaskState);
 
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
TASK_MANAGER_LOCATION_INFO);
                }
 
                List<CompletedCheckpoint> completedCheckpoints = 
coord.getSuccessfulCheckpoints();
@@ -2169,7 +2170,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        new CheckpointMetrics(),
                                taskOperatorSubtaskStates);
 
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
TASK_MANAGER_LOCATION_INFO);
                }
 
 
@@ -2185,7 +2186,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        new CheckpointMetrics(),
                                        taskOperatorSubtaskStates);
 
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
TASK_MANAGER_LOCATION_INFO);
                }
 
                List<CompletedCheckpoint> completedCheckpoints = 
coord.getSuccessfulCheckpoints();
@@ -2297,8 +2298,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        
StateObjectCollection.singleton(serializedKeyGroupStates),
                                        StateObjectCollection.empty()));
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStatesForCheckpoint), new 
LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStatesForCheckpoint), TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), 
TASK_MANAGER_LOCATION_INFO);
 
                        CompletedCheckpoint success = 
coord.getSuccessfulCheckpoints().get(0);
                        assertEquals(jid, success.getJobId());
@@ -2324,8 +2325,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        StateObjectCollection.empty()));
 
                        checkpointId = checkpointIDCounter.getLast();
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStatesForSavepoint), new 
LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStatesForSavepoint), TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), 
TASK_MANAGER_LOCATION_INFO);
 
                        assertTrue(savepointFuture.isDone());
 
@@ -2466,7 +2467,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        new CheckpointMetrics(),
                                        taskOperatorSubtaskStates);
 
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
TASK_MANAGER_LOCATION_INFO);
                }
 
                //vertex 2
@@ -2491,7 +2492,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        new CheckpointMetrics(),
                                        taskOperatorSubtaskStates);
 
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
TASK_MANAGER_LOCATION_INFO);
                }
 
                List<CompletedCheckpoint> completedCheckpoints = 
coord.getSuccessfulCheckpoints();
@@ -3830,7 +3831,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new CheckpointMetrics(),
                                taskStateSnapshot);
 
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
new LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
TASK_MANAGER_LOCATION_INFO);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 273d56c..8f4f607 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.util.SerializableObject;
 
 import org.hamcrest.BaseMatcher;
@@ -60,6 +59,8 @@ import static org.mockito.Mockito.when;
  */
 public class CheckpointStateRestoreTest {
 
+       private static final String TASK_MANAGER_LOCATION_INFO = "Unknown 
location";
+
        /**
         * Tests that on restore the task state is reset for each stateful task.
         */
@@ -130,11 +131,11 @@ public class CheckpointStateRestoreTest {
                                        
StateObjectCollection.singleton(serializedKeyGroupStates),
                                        StateObjectCollection.empty()));
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStates), new LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStates), new LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStates), new LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), new 
LocalTaskManagerLocation());
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId), new 
LocalTaskManagerLocation());
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStates), TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStates), TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStates), TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), 
TASK_MANAGER_LOCATION_INFO);
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId), 
TASK_MANAGER_LOCATION_INFO);
 
                        assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
index 411d9ce..9675631 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
@@ -448,7 +447,7 @@ public class ConcurrentFailoverStrategyExecutionGraphTest 
extends TestLogger {
                                graph.getJobID(),
                                
vertex1.getCurrentExecutionAttempt().getAttemptId(),
                                checkpointToAcknowledge),
-                               new LocalTaskManagerLocation());
+                               "Unknown location");
 
                Map<Long, PendingCheckpoint> oldPendingCheckpoints = new 
HashMap<>(3);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index b0f5cf5..c953b5d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -63,7 +63,6 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -686,7 +685,7 @@ public class FailoverRegionTest extends TestLogger {
                                        new CheckpointMetrics(),
                                        taskOperatorSubtaskStates);
 
-                               
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, new 
LocalTaskManagerLocation());
+                               
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown 
location");
                        }
                }
        }

Reply via email to