buildbot success in on flink-docs-release-0.10

2016-10-16 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/363

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave3_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





buildbot success in on flink-docs-master

2016-10-16 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-master while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-master/builds/494

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave3_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered 
this build
Build Source Stamp: [branch master] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





[4/5] flink git commit: [FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager

2016-10-16 Thread sewen
[FLINK-4689] [cluster management] Implement a simple slot provider for the new 
job manager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e91b82d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e91b82d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e91b82d3

Branch: refs/heads/flip-6
Commit: e91b82d3c868e18611064f905b345906f1414f84
Parents: 655722a
Author: Kurt Young 
Authored: Sun Oct 16 22:20:38 2016 +0800
Committer: Stephan Ewen 
Committed: Sun Oct 16 22:14:41 2016 +0200

--
 .../apache/flink/runtime/instance/SlotPool.java |  1 -
 .../jobmanager/slots/PooledSlotProvider.java| 73 
 .../flink/runtime/jobmaster/JobMaster.java  | 24 ---
 3 files changed, 89 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index e7857c1..de952c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner {
 
internalAllocateSlot(jobID, allocationID, resourceProfile, 
future);
 
-   final SlotOwner owner = this;
return future.thenApplyAsync(
new ApplyFunction() {
@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
new file mode 100644
index 000..5655fc2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotProvider;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple pool based slot provider with {@link SlotPool} as the underlying 
storage.
+ */
+public class PooledSlotProvider implements SlotProvider {
+
+   /** The pool which holds all the slots. */
+   private final SlotPool slotPool;
+
+   /** The timeout for allocation. */
+   private final Time timeout;
+
+   public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
+   this.slotPool = slotPool;
+   this.timeout = timeout;
+   }
+
+   @Override
+   public Future allocateSlot(ScheduledUnit task,
+   boolean allowQueued) throws NoResourceAvailableException
+   {
+   checkNotNull(task);
+
+   final JobID jobID = 
task.getTaskToExecute().getVertex().getJobId();
+   final Future future = 
slotPool.allocateSimpleSlot(jobID, Resou

[5/5] flink git commit: [FLINK-4351] [cluster management] JobManager handle TaskManager's registration

2016-10-16 Thread sewen
[FLINK-4351] [cluster management] JobManager handle TaskManager's registration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19cae3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19cae3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19cae3b

Branch: refs/heads/flip-6
Commit: a19cae3b07963776c07c0aae7bee806004f59429
Parents: e91b82d
Author: Kurt Young 
Authored: Sun Oct 16 23:00:57 2016 +0800
Committer: Stephan Ewen 
Committed: Sun Oct 16 22:14:41 2016 +0200

--
 .../flink/runtime/jobmaster/JobMaster.java  | 75 +---
 .../runtime/jobmaster/JobMasterGateway.java | 15 ++--
 .../runtime/taskexecutor/JobLeaderService.java  | 55 ++
 .../runtime/taskexecutor/TaskExecutor.java  |  2 +-
 .../taskexecutor/TaskManagerServices.java   |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 10 ++-
 6 files changed, 102 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 05c20d3..8cb9946 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -36,7 +37,9 @@ import 
org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -81,7 +84,9 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
@@ -89,8 +94,10 @@ import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -122,6 +129,8 @@ public class JobMaster extends 
RpcEndpoint {
/** Configuration of the JobManager */
private final Configuration configuration;
 
+   private final Time rpcTimeout;
+
/** Service to contend for and retrieve the leadership of JM and RM */
private final HighAvailabilityServices highAvailabilityServices;
 
@@ -152,7 +161,7 @@ public class JobMaster extends 
RpcEndpoint {
 
private volatile UUID leaderSessionID;
 
-   // - resource manager 
+   // - ResourceManager 
 
/** Leader retriever service used to locate ResourceManager's address */
private LeaderRetrievalService resourceManagerLeaderRetriever;
@@ -160,6 +169,9 @@ public class JobMaster extends 
RpcEndpoint {
/** Connection with ResourceManager, null if not located address yet or 
we close it initiative */
private ResourceManagerConnection resourceManagerConnection;
 
+   // - TaskManagers 
+
+   private final Map> registeredTaskManagers;
 
// 

 
@@ -181,6 +193,7 @@ public class JobMaster extends 
RpcEndpoint {
 
this.jobGraph = checkNo

[3/5] flink git commit: [FLINK-4836] [cluster management] Add flink mini cluster (part 1)

2016-10-16 Thread sewen
[FLINK-4836] [cluster management] Add flink mini cluster (part 1)

This implements
  - mini cluster configuration
  - startup / shutdown of common services (rpc, ha)
  - startup / shutdown of JobManager and Dispatcher


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/655722a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/655722a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/655722a2

Branch: refs/heads/flip-6
Commit: 655722a29edcca261653f54fa07fb7b4c94602a6
Parents: 7947662
Author: Stephan Ewen 
Authored: Sat Oct 15 00:25:41 2016 +0200
Committer: Stephan Ewen 
Committed: Sun Oct 16 22:14:36 2016 +0200

--
 .../org/apache/flink/util/ExceptionUtils.java   |  62 ++-
 .../HighAvailabilityServicesUtils.java  |  17 +
 .../highavailability/ZookeeperHaServices.java   |   2 +-
 .../runtime/jobmaster/JobManagerRunner.java |   1 -
 .../jobmaster/MiniClusterJobDispatcher.java | 394 -
 .../flink/runtime/minicluster/MiniCluster.java  | 406 ++
 .../minicluster/MiniClusterConfiguration.java   | 147 +++
 .../minicluster/MiniClusterJobDispatcher.java   | 418 +++
 .../resourcemanager/ResourceManager.java|   2 +-
 .../runtime/taskexecutor/JobLeaderService.java  |   3 +-
 .../TestingHighAvailabilityServices.java|   2 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  79 
 12 files changed, 1126 insertions(+), 407 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 0f6f24f..d34b236 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -26,10 +26,13 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Internal
 public final class ExceptionUtils {
 
@@ -58,7 +61,54 @@ public final class ExceptionUtils {
return e.getClass().getName() + " (error while printing 
stack trace)";
}
}
-   
+
+   /**
+* Adds a new exception as a {@link Throwable#addSuppressed(Throwable) 
suppressed exception}
+* to a prior exception, or returns the new exception, if no prior 
exception exists.
+* 
+* {@code
+* 
+* public void closeAllThings() throws Exception {
+* Exception ex = null;
+* try {
+* component.shutdown();
+* } catch (Exception e) {
+* ex = firstOrSuppressed(e, ex);
+* }
+* try {
+* anotherComponent.stop();
+* } catch (Exception e) {
+* ex = firstOrSuppressed(e, ex);
+* }
+* try {
+* lastComponent.shutdown();
+* } catch (Exception e) {
+* ex = firstOrSuppressed(e, ex);
+* }
+* 
+* if (ex != null) {
+* throw ex;
+* }
+* }
+* }
+* 
+* @param newException The newly occurred exception
+* @param previous The previously occurred exception, possibly null.
+* 
+* @return The new exception, if no previous exception exists, or the 
previous exception with the
+* new exception in the list of suppressed exceptions.
+*/
+   public static  T firstOrSuppressed(T newException, 
@Nullable T previous) {
+   checkNotNull(newException, "newException");
+
+   if (previous == null) {
+   return newException;
+   } else {
+   previous.addSuppressed(newException);
+   return previous;
+   }
+   }
+
/**
 * Throws the given {@code Throwable} in scenarios where the signatures 
do not allow you to
 * throw an arbitrary Throwable. Errors and RuntimeExceptions are 
thrown directly, other exceptions
@@ -161,10 +211,8 @@ public final class ExceptionUtils {
}
}
 
-   /**
-* Private constructor to prevent instantiation.
-*/
-   private ExceptionUtils() {
-   throw new RuntimeException();
-   }
+   // 

+
+ 

[2/5] flink git commit: [FLINK-4835] [cluster management] Add embedded version of the high-availability services

2016-10-16 Thread sewen
[FLINK-4835] [cluster management] Add embedded version of the high-availability 
services

This includes the addition of the EmbeddedLeaderService
and a clean shutdown hook for all high availability services.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79476624
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79476624
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79476624

Branch: refs/heads/flip-6
Commit: 7947662467d102edc675fde19948b7638a044343
Parents: da16b0a
Author: Stephan Ewen 
Authored: Fri Oct 14 23:57:11 2016 +0200
Committer: Stephan Ewen 
Committed: Sun Oct 16 22:09:39 2016 +0200

--
 .../StandaloneCheckpointRecoveryFactory.java|   4 +-
 .../highavailability/EmbeddedNonHaServices.java |  62 +++
 .../HighAvailabilityServices.java   |   7 +-
 .../runtime/highavailability/NonHaServices.java |  62 +--
 .../highavailability/ZookeeperHaServices.java   |  12 +-
 .../nonha/AbstractNonHaServices.java| 175 +++
 .../nonha/EmbeddedLeaderService.java| 466 +++
 .../TestingHighAvailabilityServices.java|   9 +
 8 files changed, 736 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index a9624fb..57785ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -40,8 +40,8 @@ public class StandaloneCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
public CompletedCheckpointStore createCheckpointStore(JobID jobId, 
ClassLoader userClassLoader)
throws Exception {
 
-   return new 
StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
-   .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+   return new StandaloneCompletedCheckpointStore(
+   
CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
}
 
@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
new file mode 100644
index 000..58da287
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.highavailability.nonha.EmbeddedLeaderService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the 
non-high-availability case
+ * where all participants (ResourceManager, JobManagers, TaskManagers) run in 
the same process.
+ *
+ * This implementation has no dependencies on any external services. It 
returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply 
on the heap or
+ * on a local file system and therefore in a storage without guarantees.
+ */
+public class Embedded

[1/5] flink git commit: [hotfix] [tests] Migrate some test tasks to Java

2016-10-16 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/flip-6 2486d3787 -> a19cae3b0


[hotfix] [tests] Migrate some test tasks to Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da16b0a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da16b0a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da16b0a7

Branch: refs/heads/flip-6
Commit: da16b0a7b1bf5d771d07428ae485048ce540bbf7
Parents: 2486d37
Author: Stephan Ewen 
Authored: Fri Oct 14 23:54:29 2016 +0200
Committer: Stephan Ewen 
Committed: Sun Oct 16 22:01:32 2016 +0200

--
 .../StackTraceSampleCoordinatorITCase.java  |  8 ++--
 .../checkpoint/CoordinatorShutdownTest.java |  8 ++--
 .../ExecutionGraphMetricsTest.java  |  4 +-
 .../ExecutionGraphRestartTest.java  | 23 +--
 .../runtime/jobmanager/JobManagerTest.java  |  9 +++--
 .../flink/runtime/jobmanager/JobSubmitTest.java |  9 ++---
 .../runtime/taskmanager/TaskManagerTest.java| 41 ++--
 .../testtasks/BlockingNoOpInvokable.java| 39 +++
 .../flink/runtime/testtasks/NoOpInvokable.java  | 30 ++
 .../runtime/testtasks/WaitingNoOpInvokable.java | 34 
 .../TaskManagerLossFailsTasksTest.scala |  3 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  1 +
 .../apache/flink/runtime/jobmanager/Tasks.scala | 20 --
 .../JobSubmissionFailsITCase.java   |  6 +--
 .../JobManagerHACheckpointRecoveryITCase.java   |  4 +-
 .../JobManagerHAJobGraphRecoveryITCase.java |  4 +-
 .../jobmanager/JobManagerFailsITCase.scala  |  2 +-
 .../taskmanager/TaskManagerFailsITCase.scala|  3 +-
 18 files changed, 170 insertions(+), 78 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
--
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 9b1f608..d74af08 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,14 +31,16 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -81,7 +83,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
final int parallelism = 1;
 
final JobVertex task = new JobVertex("Task");
-   
task.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+   task.setInvokableClass(BlockingNoOpInvokable.class);
task.setParallelism(parallelism);
 
jobGraph.addVertex(task);

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ea4d322..a346a80 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -28,11 +28,13 @@ import org.apache.flink.runtime.jobgraph.Job

flink git commit: [FLINK-4709] [core] Fix resource leak in InputStreamFSInputWrapper

2016-10-16 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.1 9f7269808 -> fe464b424


[FLINK-4709] [core] Fix resource leak in InputStreamFSInputWrapper

This closes #2581


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe464b42
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe464b42
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe464b42

Branch: refs/heads/release-1.1
Commit: fe464b42483399a5dee5116699f600c551c37db9
Parents: 9f72698
Author: Holger Frydrych 
Authored: Mon Oct 3 14:34:19 2016 +0200
Committer: Stephan Ewen 
Committed: Sun Oct 16 19:11:11 2016 +0200

--
 .../common/io/InputStreamFSInputWrapper.java|  5 +++
 .../io/InputStreamFSInputWrapperTest.java   | 38 
 2 files changed, 43 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fe464b42/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java
index cfd94bc..f7db680 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java
@@ -42,6 +42,11 @@ public class InputStreamFSInputWrapper extends 
FSDataInputStream {
}
 
@Override
+   public void close() throws IOException {
+   this.inStream.close();
+   }
+
+   @Override
public void seek(long desired) throws IOException {
if (desired < this.pos) {
throw new IllegalArgumentException("Wrapped 
InputStream: cannot search backwards.");

http://git-wip-us.apache.org/repos/asf/flink/blob/fe464b42/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java
--
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java
new file mode 100644
index 000..8fcd231
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import java.io.InputStream;
+
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class InputStreamFSInputWrapperTest {
+
+   @Test
+   public void testClose() throws Exception {
+   InputStream mockedInputStream = mock(InputStream.class);
+   InputStreamFSInputWrapper wrapper = new 
InputStreamFSInputWrapper(mockedInputStream);
+   wrapper.close();
+   verify(mockedInputStream).close();
+   }
+
+}



[1/2] flink git commit: [FLINK-4360] [tm] Implement TM -> JM registration logic [Forced Update!]

2016-10-16 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/flip-6 928800569 -> 2486d3787 (forced update)


http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
new file mode 100644
index 000..66d8102
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+/**
+ * Exception indicating that the slot allocation on the task manager failed.
+ */
+public class SlotAllocationException extends TaskManagerException {
+
+   private static final long serialVersionUID = -4764932098204266773L;
+
+   public SlotAllocationException(String message) {
+   super(message);
+   }
+
+   public SlotAllocationException(String message, Throwable cause) {
+   super(message, cause);
+   }
+
+   public SlotAllocationException(Throwable cause) {
+   super(cause);
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 42cb919..88123b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -70,10 +70,7 @@ public class TaskSlotTable implements 
TimeoutListener {
 
/** Interface for slot actions, such as freeing them or timing them out 
*/
private SlotActions slotActions;
-
-   /** The timeout for allocated slots */
-   private Time slotTimeout;
-
+   
/** Whether the table has been started */
private boolean started;
 
@@ -104,7 +101,6 @@ public class TaskSlotTable implements 
TimeoutListener {
slotsPerJob = new HashMap<>(4);
 
slotActions = null;
-   slotTimeout = null;
started = false;
}
 
@@ -112,11 +108,9 @@ public class TaskSlotTable implements 
TimeoutListener {
 * Start the task slot table with the given slot actions and slot 
timeout value.
 *
 * @param initialSlotActions to use for slot actions
-* @param initialSlotTimeout to use for slot timeouts
 */
-   public void start(SlotActions initialSlotActions, Time 
initialSlotTimeout) {
+   public void start(SlotActions initialSlotActions) {
this.slotActions = 
Preconditions.checkNotNull(initialSlotActions);
-   this.slotTimeout = 
Preconditions.checkNotNull(initialSlotTimeout);
 
timerService.start(this);
 
@@ -129,7 +123,6 @@ public class TaskSlotTable implements 
TimeoutListener {
public void stop() {
started = false;
timerService.stop();
-   slotTimeout = null;
slotActions = null;
}
 
@@ -144,9 +137,10 @@ public class TaskSlotTable implements 
TimeoutListener {
 * @param index of the task slot to allocate
 * @param jobId to allocate the task slot for
 * @param allocationId identifying the allocation
+* @param slotTimeout until the slot times out
 * @return True if the task slot could be allocated; otherwise false
 */
-   public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId) {
+   public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId, Time slotTimeout) {
  

[2/2] flink git commit: [FLINK-4360] [tm] Implement TM -> JM registration logic

2016-10-16 Thread trohrmann
[FLINK-4360] [tm] Implement TM -> JM registration logic

Upon requesting a slot for a new job, the TaskManager registers this job at the
JobLeaderService. The job leader service is responsible to monitor job leader 
changes
for all registered jobs. In case of a new job leader, the service will try to 
establish
a connection to the new job leader. Upon establishing the connection the task 
manager
is informed about it. The task manager will then offer all allocated but not 
yet active
slots to the new job leader.

Implement JobLeaderService

The JobLeaderService is responsible for establishing a connection to the JM 
leader of a given
job.

Disable TaskExecutorTest#testRejectAllocationRequestsForOutOfSyncSlots

Add simple task submission test

Add job leader detection test case

Add task slot acceptance test

Fix RpcCompletenessTest

Add comments

This closes #2640.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2486d378
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2486d378
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2486d378

Branch: refs/heads/flip-6
Commit: 2486d3787b0d45a6220d3dcc96ca665986c47837
Parents: 9da76dc
Author: Till Rohrmann 
Authored: Wed Oct 5 17:02:06 2016 +0200
Committer: Till Rohrmann 
Committed: Sun Oct 16 10:47:40 2016 +0200

--
 .../org/apache/flink/util/ReflectionUtil.java   | 110 
 .../deployment/TaskDeploymentDescriptor.java|  11 +-
 .../runtime/executiongraph/ExecutionVertex.java |   2 +
 .../HighAvailabilityServices.java   |   3 +-
 .../runtime/highavailability/NonHaServices.java |   4 +-
 .../highavailability/ZookeeperHaServices.java   |   2 +-
 .../jobmaster/JMTMRegistrationSuccess.java  |  45 ++
 .../flink/runtime/jobmaster/JobMaster.java  |  19 +
 .../runtime/jobmaster/JobMasterGateway.java |  36 ++
 .../registration/RegisteredRpcConnection.java   |   2 +-
 .../resourcemanager/ResourceManager.java|   2 +-
 .../slotmanager/SlotManager.java|   8 +-
 .../runtime/taskexecutor/JobLeaderListener.java |  60 +++
 .../runtime/taskexecutor/JobLeaderService.java  | 390 ++
 .../taskexecutor/JobManagerConnection.java  |  23 +-
 .../runtime/taskexecutor/JobManagerTable.java   |  59 +++
 .../runtime/taskexecutor/TaskExecutor.java  | 522 ++-
 .../taskexecutor/TaskExecutorGateway.java   |  25 +-
 ...TaskExecutorToResourceManagerConnection.java |   5 +
 .../runtime/taskexecutor/TaskManagerRunner.java |   2 +
 .../taskexecutor/TaskManagerServices.java   |  24 +-
 .../exceptions/SlotAllocationException.java |  39 ++
 .../taskexecutor/slot/TaskSlotTable.java|  39 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../TaskDeploymentDescriptorTest.java   |   6 +-
 .../TestingHighAvailabilityServices.java|   2 +-
 .../metrics/groups/TaskManagerGroupTest.java|  10 +-
 .../slotmanager/SlotManagerTest.java|   2 +-
 .../slotmanager/SlotProtocolTest.java   |   8 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  29 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 431 ++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +-
 .../runtime/taskmanager/TaskManagerTest.java|  75 +--
 .../flink/runtime/taskmanager/TaskStopTest.java |   2 +-
 .../flink/runtime/taskmanager/TaskTest.java |  23 +-
 .../tasks/InterruptSensitiveRestoreTest.java|  38 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  25 +-
 37 files changed, 1805 insertions(+), 283 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
index b851eba..2883570 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
@@ -23,6 +23,9 @@ import org.apache.flink.annotation.Internal;
 
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 
 @Internal
 public final class ReflectionUtil {
@@ -151,6 +154,113 @@ public final class ReflectionUtil {
}
 
/**
+* Extract the full template type information from the given type's 
template parameter at the
+* given position.
+*
+* @param type type to extract the full template parameter information 
from
+* @param templatePosition describing at which position the template 
type parameter is
+* @return Full type information de

[1/2] flink git commit: [FLINK-4343] [tm] Implement TM -> JM registration logic

2016-10-16 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/flip-6 9da76dcfd -> 928800569


http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
new file mode 100644
index 000..66d8102
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+/**
+ * Exception indicating that the slot allocation on the task manager failed.
+ */
+public class SlotAllocationException extends TaskManagerException {
+
+   private static final long serialVersionUID = -4764932098204266773L;
+
+   public SlotAllocationException(String message) {
+   super(message);
+   }
+
+   public SlotAllocationException(String message, Throwable cause) {
+   super(message, cause);
+   }
+
+   public SlotAllocationException(Throwable cause) {
+   super(cause);
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 42cb919..88123b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -70,10 +70,7 @@ public class TaskSlotTable implements 
TimeoutListener {
 
/** Interface for slot actions, such as freeing them or timing them out 
*/
private SlotActions slotActions;
-
-   /** The timeout for allocated slots */
-   private Time slotTimeout;
-
+   
/** Whether the table has been started */
private boolean started;
 
@@ -104,7 +101,6 @@ public class TaskSlotTable implements 
TimeoutListener {
slotsPerJob = new HashMap<>(4);
 
slotActions = null;
-   slotTimeout = null;
started = false;
}
 
@@ -112,11 +108,9 @@ public class TaskSlotTable implements 
TimeoutListener {
 * Start the task slot table with the given slot actions and slot 
timeout value.
 *
 * @param initialSlotActions to use for slot actions
-* @param initialSlotTimeout to use for slot timeouts
 */
-   public void start(SlotActions initialSlotActions, Time 
initialSlotTimeout) {
+   public void start(SlotActions initialSlotActions) {
this.slotActions = 
Preconditions.checkNotNull(initialSlotActions);
-   this.slotTimeout = 
Preconditions.checkNotNull(initialSlotTimeout);
 
timerService.start(this);
 
@@ -129,7 +123,6 @@ public class TaskSlotTable implements 
TimeoutListener {
public void stop() {
started = false;
timerService.stop();
-   slotTimeout = null;
slotActions = null;
}
 
@@ -144,9 +137,10 @@ public class TaskSlotTable implements 
TimeoutListener {
 * @param index of the task slot to allocate
 * @param jobId to allocate the task slot for
 * @param allocationId identifying the allocation
+* @param slotTimeout until the slot times out
 * @return True if the task slot could be allocated; otherwise false
 */
-   public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId) {
+   public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId, Time slotTimeout) {
checkInit(

[2/2] flink git commit: [FLINK-4343] [tm] Implement TM -> JM registration logic

2016-10-16 Thread trohrmann
[FLINK-4343] [tm] Implement TM -> JM registration logic

Upon requesting a slot for a new job, the TaskManager registers this job at the
JobLeaderService. The job leader service is responsible to monitor job leader 
changes
for all registered jobs. In case of a new job leader, the service will try to 
establish
a connection to the new job leader. Upon establishing the connection the task 
manager
is informed about it. The task manager will then offer all allocated but not 
yet active
slots to the new job leader.

Implement JobLeaderService

The JobLeaderService is responsible for establishing a connection to the JM 
leader of a given
job.

Disable TaskExecutorTest#testRejectAllocationRequestsForOutOfSyncSlots

Add simple task submission test

Add job leader detection test case

Add task slot acceptance test

Fix RpcCompletenessTest

Add comments

This closes #2640.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92880056
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92880056
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92880056

Branch: refs/heads/flip-6
Commit: 928800569234156876b9744de064cc359f121664
Parents: 9da76dc
Author: Till Rohrmann 
Authored: Wed Oct 5 17:02:06 2016 +0200
Committer: Till Rohrmann 
Committed: Sun Oct 16 10:46:09 2016 +0200

--
 .../org/apache/flink/util/ReflectionUtil.java   | 110 
 .../deployment/TaskDeploymentDescriptor.java|  11 +-
 .../runtime/executiongraph/ExecutionVertex.java |   2 +
 .../HighAvailabilityServices.java   |   3 +-
 .../runtime/highavailability/NonHaServices.java |   4 +-
 .../highavailability/ZookeeperHaServices.java   |   2 +-
 .../jobmaster/JMTMRegistrationSuccess.java  |  45 ++
 .../flink/runtime/jobmaster/JobMaster.java  |  19 +
 .../runtime/jobmaster/JobMasterGateway.java |  36 ++
 .../registration/RegisteredRpcConnection.java   |   2 +-
 .../resourcemanager/ResourceManager.java|   2 +-
 .../slotmanager/SlotManager.java|   8 +-
 .../runtime/taskexecutor/JobLeaderListener.java |  60 +++
 .../runtime/taskexecutor/JobLeaderService.java  | 390 ++
 .../taskexecutor/JobManagerConnection.java  |  23 +-
 .../runtime/taskexecutor/JobManagerTable.java   |  59 +++
 .../runtime/taskexecutor/TaskExecutor.java  | 522 ++-
 .../taskexecutor/TaskExecutorGateway.java   |  25 +-
 ...TaskExecutorToResourceManagerConnection.java |   5 +
 .../runtime/taskexecutor/TaskManagerRunner.java |   2 +
 .../taskexecutor/TaskManagerServices.java   |  24 +-
 .../exceptions/SlotAllocationException.java |  39 ++
 .../taskexecutor/slot/TaskSlotTable.java|  39 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../TaskDeploymentDescriptorTest.java   |   6 +-
 .../TestingHighAvailabilityServices.java|   2 +-
 .../metrics/groups/TaskManagerGroupTest.java|  10 +-
 .../slotmanager/SlotManagerTest.java|   2 +-
 .../slotmanager/SlotProtocolTest.java   |   8 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  29 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 431 ++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +-
 .../runtime/taskmanager/TaskManagerTest.java|  75 +--
 .../flink/runtime/taskmanager/TaskStopTest.java |   2 +-
 .../flink/runtime/taskmanager/TaskTest.java |  23 +-
 .../tasks/InterruptSensitiveRestoreTest.java|  38 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  25 +-
 37 files changed, 1805 insertions(+), 283 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
index b851eba..2883570 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
@@ -23,6 +23,9 @@ import org.apache.flink.annotation.Internal;
 
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 
 @Internal
 public final class ReflectionUtil {
@@ -151,6 +154,113 @@ public final class ReflectionUtil {
}
 
/**
+* Extract the full template type information from the given type's 
template parameter at the
+* given position.
+*
+* @param type type to extract the full template parameter information 
from
+* @param templatePosition describing at which position the template 
type parameter is
+* @return Full type information de