This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f689b1  [FLINK-10205] Introduce fault tolerance for InputSplits in 
batch execution
4f689b1 is described below

commit 4f689b1994eb2af584b447dc2f518e5d37bc2b6f
Author: Ryantaocer <43952554+ryantao...@users.noreply.github.com>
AuthorDate: Wed Apr 10 17:45:14 2019 +0800

    [FLINK-10205] Introduce fault tolerance for InputSplits in batch execution
    
    This closes #8125.
---
 .../api/common/io/DefaultInputSplitAssigner.java   |   9 ++
 .../api/common/io/LocatableInputSplitAssigner.java |  13 ++
 .../common/io/ReplicatingInputSplitAssigner.java   |   8 ++
 .../apache/flink/core/io/InputSplitAssigner.java   |   9 ++
 .../java/distcp/FileCopyTaskInputFormat.java       |  10 ++
 .../flink/runtime/executiongraph/Execution.java    |   7 ++
 .../runtime/executiongraph/ExecutionVertex.java    |  44 +++++--
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   9 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 136 ++++++++++++++++++++-
 .../classloading/jar/CustomInputSplitProgram.java  |   9 ++
 .../jar/StreamingCustomInputSplitProgram.java      |   9 ++
 11 files changed, 243 insertions(+), 20 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
index 25acc42..f8fd187 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
@@ -72,4 +72,13 @@ public class DefaultInputSplitAssigner implements 
InputSplitAssigner {
                }
                return next;
        }
+
+       @Override
+       public void returnInputSplit(List<InputSplit> splits, int taskId) {
+               synchronized (this.splits) {
+                       for (InputSplit split : splits) {
+                               this.splits.add(split);
+                       }
+               }
+       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
index 517f40a..7cfcf9f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
@@ -21,11 +21,13 @@ package org.apache.flink.api.common.io;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.core.io.InputSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -202,6 +204,17 @@ public final class LocatableInputSplitAssigner implements 
InputSplitAssigner {
                }
        }
 
+       @Override
+       public void returnInputSplit(List<InputSplit> splits, int taskId) {
+               synchronized (this.unassigned) {
+                       for (InputSplit split : splits) {
+                               LocatableInputSplitWithCount lisw = new 
LocatableInputSplitWithCount((LocatableInputSplit) split);
+                               this.remoteSplitChooser.addInputSplit(lisw);
+                               this.unassigned.add(lisw);
+                       }
+               }
+       }
+
        private static final boolean isLocal(String flinkHost, String[] hosts) {
                if (flinkHost == null || hosts == null) {
                        return false;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
index e7dda94..ee2c721 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
@@ -21,9 +21,11 @@ package org.apache.flink.api.common.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
 /**
  * Assigns each InputSplit to each requesting parallel instance.
@@ -78,4 +80,10 @@ public class ReplicatingInputSplitAssigner implements 
InputSplitAssigner {
                }
 
        }
+
+       @Override
+       public void returnInputSplit(List<InputSplit> splits, int taskId) {
+               Preconditions.checkArgument(taskId >=0 && taskId < 
assignCounts.length);
+               assignCounts[taskId] = 0;
+       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java 
b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
index a17dfbe..fcf741d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
@@ -21,6 +21,8 @@ package org.apache.flink.core.io;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import java.util.List;
+
 /**
  * An input split assigner distributes the {@link InputSplit}s among the 
instances on which a
  * data source exists.
@@ -38,4 +40,11 @@ public interface InputSplitAssigner {
         */
        InputSplit getNextInputSplit(String host, int taskId);
 
+       /**
+        * Return the splits to assigner if the task failed to process it.
+        *
+        * @param splits The list of input splits to be returned.
+        * @param taskId The id of the task that failed to process the input 
splits.
+        * */
+       void returnInputSplit(List<InputSplit> splits, int taskId);
 }
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
index dfd9bf0..fbf7da4 100644
--- 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +63,15 @@ public class FileCopyTaskInputFormat implements 
InputFormat<FileCopyTask, FileCo
                        LOGGER.info("Getting copy task for task: " + taskId);
                        return splits.poll();
                }
+
+               @Override
+               public void returnInputSplit(List<InputSplit> splits, int 
taskId) {
+                       synchronized (this.splits) {
+                               for (InputSplit split : splits) {
+                                       
Preconditions.checkState(this.splits.add((FileCopyTaskInputSplit) split));
+                               }
+                       }
+               }
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index d5f091f..dd8db3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -316,6 +317,12 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                }
        }
 
+       public InputSplit getNextInputSplit() {
+               final LogicalSlot slot = this.getAssignedResource();
+               final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+               return this.vertex.getNextInputSplit(host);
+       }
+
        @Override
        public TaskManagerLocation getAssignedResourceLocation() {
                // returns non-null only when a location is already assigned
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 42340b1..fc2a0f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
@@ -107,6 +109,8 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
        /** The current or latest execution attempt of this vertex's task. */
        private volatile Execution currentExecution;    // this field must 
never be null
 
+       private final ArrayList<InputSplit> inputSplits;
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -188,6 +192,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                getExecutionGraph().registerExecution(currentExecution);
 
                this.timeout = timeout;
+               this.inputSplits = new ArrayList<>();
        }
 
 
@@ -242,8 +247,8 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
        }
 
        public ExecutionEdge[] getInputEdges(int input) {
-               if (input < 0 || input >= this.inputEdges.length) {
-                       throw new IllegalArgumentException(String.format("Input 
%d is out of range [0..%d)", input, this.inputEdges.length));
+               if (input < 0 || input >= inputEdges.length) {
+                       throw new IllegalArgumentException(String.format("Input 
%d is out of range [0..%d)", input, inputEdges.length));
                }
                return inputEdges[input];
        }
@@ -252,6 +257,17 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                return locationConstraint;
        }
 
+       public InputSplit getNextInputSplit(String host) {
+               final int taskId = getParallelSubtaskIndex();
+               synchronized (inputSplits) {
+                       final InputSplit nextInputSplit = 
jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
+                       if (nextInputSplit != null) {
+                               inputSplits.add(nextInputSplit);
+                       }
+                       return nextInputSplit;
+               }
+       }
+
        @Override
        public Execution getCurrentExecutionAttempt() {
                return currentExecution;
@@ -371,7 +387,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
 
                }
 
-               this.inputEdges[inputNumber] = edges;
+               inputEdges[inputNumber] = edges;
 
                // add the consumers to the source
                // for now (until the receiver initiated handshake is in 
place), we need to register the
@@ -594,11 +610,19 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                                        timestamp,
                                        timeout);
 
-                               this.currentExecution = newExecution;
+                               currentExecution = newExecution;
+
+                               synchronized (inputSplits) {
+                                       InputSplitAssigner assigner = 
jobVertex.getSplitAssigner();
+                                       if (assigner != null) {
+                                               
assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex());
+                                               inputSplits.clear();
+                                       }
+                               }
 
                                CoLocationGroup grp = 
jobVertex.getCoLocationGroup();
                                if (grp != null) {
-                                       this.locationConstraint = 
grp.getLocationConstraint(subTaskIndex);
+                                       locationConstraint = 
grp.getLocationConstraint(subTaskIndex);
                                }
 
                                // register this execution at the execution 
graph, to receive call backs
@@ -643,8 +667,8 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
 
        @VisibleForTesting
        public void deployToSlot(LogicalSlot slot) throws JobException {
-               if (this.currentExecution.tryAssignResource(slot)) {
-                       this.currentExecution.deploy();
+               if (currentExecution.tryAssignResource(slot)) {
+                       currentExecution.deploy();
                } else {
                        throw new IllegalStateException("Could not assign 
resource " + slot + " to current execution " +
                                currentExecution + '.');
@@ -659,7 +683,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
        public CompletableFuture<?> cancel() {
                // to avoid any case of mixup in the presence of concurrent 
calls,
                // we copy a reference to the stack to make sure both calls go 
to the same Execution
-               final Execution exec = this.currentExecution;
+               final Execution exec = currentExecution;
                exec.cancel();
                return exec.getReleaseFuture();
        }
@@ -669,11 +693,11 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
        }
 
        public void stop() {
-               this.currentExecution.stop();
+               currentExecution.stop();
        }
 
        public void fail(Throwable t) {
-               this.currentExecution.fail(t);
+               currentExecution.fail(t);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index afd39e0..845a40b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
@@ -566,16 +565,12 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        return FutureUtils.completedExceptionally(new 
Exception("Cannot find execution vertex for vertex ID " + vertexID));
                }
 
-               final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
-               if (splitAssigner == null) {
+               if (vertex.getSplitAssigner() == null) {
                        log.error("No InputSplitAssigner for vertex ID {}.", 
vertexID);
                        return FutureUtils.completedExceptionally(new 
Exception("No InputSplitAssigner for vertex ID " + vertexID));
                }
 
-               final LogicalSlot slot = execution.getAssignedResource();
-               final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
-               final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
-               final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+               final InputSplit nextInputSplit = execution.getNextInputSplit();
 
                if (log.isDebugEnabled()) {
                        log.debug("Send next input split {}.", nextInputSplit);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f1baf80..396754b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -22,14 +22,18 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -54,13 +58,16 @@ import 
org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -68,6 +75,7 @@ import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -87,6 +95,8 @@ import 
org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -128,6 +138,7 @@ import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -164,6 +175,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -831,6 +843,124 @@ public class JobMasterTest extends TestLogger {
                }
        }
 
+       private JobGraph createDataSourceJobGraph() throws Exception {
+               final TextInputFormat inputFormat = new TextInputFormat(new 
Path("."));
+               final InputFormatVertex producer = new 
InputFormatVertex("Producer");
+               new TaskConfig(producer.getConfiguration()).setStubWrapper(new 
UserCodeObjectWrapper<InputFormat<?, ?>>(inputFormat));
+               producer.setInvokableClass(DataSourceTask.class);
+
+               final JobVertex consumer = new JobVertex("Consumer");
+               consumer.setInvokableClass(NoOpInvokable.class);
+               consumer.connectNewDataSetAsInput(producer, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+               final JobGraph jobGraph = new JobGraph(producer, consumer);
+               jobGraph.setAllowQueuedScheduling(true);
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0));
+               jobGraph.setExecutionConfig(executionConfig);
+
+               return jobGraph;
+       }
+
+       /**
+        * Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, 
ExecutionAttemptID)}
+        * validate that it will get same result for a different retry
+        */
+       @Test
+       public void testRequestNextInputSplitWithDataSourceFailover() throws 
Exception {
+
+               final JobGraph dataSourceJobGraph = createDataSourceJobGraph();
+               
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY,
+                       
FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME);
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       dataSourceJobGraph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build(),
+                       heartbeatServices);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId);
+
+               try {
+                       // wait for the start to complete
+                       startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+                       final CompletableFuture<AllocationID> 
allocationIdFuture = new CompletableFuture<>();
+                       
testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+                       
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+
+                       final CompletableFuture<TaskDeploymentDescriptor> 
tddFuture = new CompletableFuture<>();
+                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+                               
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+                                       
tddFuture.complete(taskDeploymentDescriptor);
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               })
+                               .createTestingTaskExecutorGateway();
+                       
rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), 
testingTaskExecutorGateway);
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       
rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
+
+                       final AllocationID allocationId = 
allocationIdFuture.get();
+
+                       final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+                       
jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), 
taskManagerLocation, testingTimeout).get();
+
+                       final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+
+                       final Collection<SlotOffer> slotOffers = 
jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), 
Collections.singleton(slotOffer), testingTimeout).get();
+
+                       assertThat(slotOffers, hasSize(1));
+                       assertThat(slotOffers, contains(slotOffer));
+
+                       // obtain tdd for the result partition ids
+                       final TaskDeploymentDescriptor tdd = tddFuture.get();
+
+                       final JobMasterGateway gateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       final TaskInformation taskInformation = 
tdd.getSerializedTaskInformation()
+                               .deserializeValue(getClass().getClassLoader());
+                       JobVertexID vertexID = taskInformation.getJobVertexId();
+
+                       //get the previous split
+                       SerializedInputSplit split1 = 
gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get();
+
+                       //start a new version of this execution
+                       ExecutionGraph executionGraph = 
jobMaster.getExecutionGraph();
+                       Execution execution = 
executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId());
+                       ExecutionVertex executionVertex = execution.getVertex();
+
+                       gateway.updateTaskExecutionState(new 
TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), 
ExecutionState.FAILED)).get();
+                       Execution newExecution = 
executionVertex.getCurrentExecutionAttempt();
+
+                       //get the new split
+                       SerializedInputSplit split2 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+                       Assert.assertArrayEquals(split1.getInputSplitData(), 
split2.getInputSplitData());
+
+                       //get the new split3
+                       SerializedInputSplit split3 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+                       
Assert.assertNotEquals(split1.getInputSplitData().length, 
split3.getInputSplitData().length);
+                       gateway.requestNextInputSplit(vertexID, 
newExecution.getAttemptId()).get();
+                       InputSplit nullSplit = 
InstantiationUtil.deserializeObject(
+                               gateway.requestNextInputSplit(vertexID, 
newExecution.getAttemptId()).get().getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+                       Assert.assertNull(nullSplit);
+
+                       InputSplit nullSplit1 = 
InstantiationUtil.deserializeObject(
+                               gateway.requestNextInputSplit(vertexID, 
newExecution.getAttemptId()).get().getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+                       Assert.assertNull(nullSplit1);
+
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
        @Test
        public void testRequestNextInputSplit() throws Exception {
                final List<TestingInputSplit> expectedInputSplits = 
Arrays.asList(
@@ -1319,9 +1449,9 @@ public class JobMasterTest extends TestLogger {
 
                        @Override
                        public CompletableFuture<String> triggerSavepoint(
-                                       @Nullable final String targetDirectory,
-                                       final boolean cancelJob,
-                                       final Time timeout) {
+                               @Nullable final String targetDirectory,
+                               final boolean cancelJob,
+                               final Time timeout) {
                                return new CompletableFuture<>();
                        }
                };
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
index a5a2531..fe6c172 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -151,5 +151,14 @@ public class CustomInputSplitProgram {
                                }
                        }
                }
+
+               @Override
+               public void returnInputSplit(List<InputSplit> splits, int 
taskId) {
+                       synchronized (this) {
+                               for (InputSplit split : splits) {
+                                       remainingSplits.add((CustomInputSplit) 
split);
+                               }
+                       }
+               }
        }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 69421a6..26fe96a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -157,6 +157,15 @@ public class StreamingCustomInputSplitProgram {
                                }
                        }
                }
+
+               @Override
+               public void returnInputSplit(List<InputSplit> splits, int 
taskId) {
+                       synchronized (this) {
+                               for (InputSplit split : splits) {
+                                       remainingSplits.add((CustomInputSplit) 
split);
+                               }
+                       }
+               }
        }
 
        private static class NoOpSink implements SinkFunction<Tuple2<Integer, 
Double>> {

Reply via email to